From 613873887d2dd3430054ff59ee92276878673a59 Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Fri, 19 May 2023 05:10:19 -0400 Subject: [PATCH 01/28] Support creating and using indices --- greenplumpython/dataframe.py | 60 +++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index a03530fe..e5b07a51 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -68,6 +68,7 @@ def __init__( parents: List["DataFrame"] = [], db: Optional[Database] = None, columns: Optional[Iterable[Column]] = None, + is_saved: bool = False, ) -> None: # FIXME: Add doc # noqa @@ -76,11 +77,16 @@ def __init__( self._name = "cte_" + uuid4().hex self._columns = columns self._contents: Optional[Iterable[RealDictRow]] = None + self._is_saved = is_saved if any(parents): self._db = next(iter(parents))._db else: self._db = db + @property + def is_saved(self) -> bool: + return self._is_saved + @singledispatchmethod def _getitem(self, _) -> "DataFrame": raise NotImplementedError() @@ -847,7 +853,7 @@ def _fetch(self, is_all: bool = True) -> Iterable[Tuple[Any]]: parents=[self], ) result = self._db._execute(to_json_dataframe._build_full_query()) - return result if result is not None else [] + return result if isinstance(result, Iterable) else [] def save_as( self, @@ -915,35 +921,44 @@ def save_as( storage_parameters = ( f"WITH ({','.join([f'{key}={storage_params[key]}' for key in storage_params.keys()])})" ) - df_full_name = f'"{table_name}"' if schema is None else f'"{schema}"."{table_name}"' + qualified_table_name = f'"{table_name}"' if schema is None else f'"{schema}"."{table_name}"' self._db._execute( f""" - CREATE {'TEMP' if temp else ''} TABLE {df_full_name} + CREATE {'TEMP' if temp else ''} TABLE {qualified_table_name} ({','.join(column_names)}) {storage_parameters if storage_params else ''} AS {self._build_full_query()} """, has_results=False, ) - return DataFrame.from_table(table_name, self._db) - - # TODO: Uncomment or remove this. - # - # def create_index( - # self, - # columns: Iterable[Union["Column", str]], - # method: str = "btree", - # name: Optional[str] = None, - # ) -> None: - # if not self._in_catalog(): - # raise Exception("Cannot create index on dataframes not in the system catalog.") - # index_name: str = name if name is not None else "idx_" + uuid4().hex - # indexed_cols = ",".join([str(col) for col in columns]) - # assert self._db is not None - # self._db._execute( - # f"CREATE INDEX {index_name} ON {self.name} USING {method} ({indexed_cols})", - # has_results=False, - # ) + return DataFrame.from_table(table_name, self._db, schema=schema) + + def create_index( + self, + columns: List[str], + method: str = "btree", + name: Optional[str] = None, + ) -> "DataFrame": + assert self.is_saved, "Cannot create index for unsaved dataframe." + assert len(columns) > 0, "Column set to be indexed cannot be empty." + + index_name: str = "idx_" + uuid4().hex if name is None else name + qualified_table_name = ( + f'"{self._schema}"."{self._name}"' if self._schema is not None else f'"{self._name}"' + ) + key_cols = [] + for name in columns: + col: Column = self[name] + key_cols.append(col._serialize()) + + assert self._db is not None + self._db._execute( + f'CREATE INDEX "{index_name}" ON {qualified_table_name} USING "{method}" (' + f' {",".join(key_cols)}' + f")", + has_results=False, + ) + return self def _explain(self, format: str = "TEXT") -> Iterable[Tuple[str]]: """ @@ -1036,6 +1051,7 @@ def from_table(cls, table_name: str, db: Database, schema: Optional[str] = None) return DataFrame( f'TABLE "{schema}"."{table_name}"' if schema is not None else f'TABLE "{table_name}"', db=db, + is_saved=True, ) @classmethod From 6a1d2158b6fb5aad26f02f3c3648007d211ef4bc Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Sun, 21 May 2023 21:51:28 -0400 Subject: [PATCH 02/28] Add support for operators and tests --- greenplumpython/__init__.py | 1 + greenplumpython/op.py | 27 ++++++++++++++++++ tests/test_index.py | 57 +++++++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+) create mode 100644 greenplumpython/op.py create mode 100644 tests/test_index.py diff --git a/greenplumpython/__init__.py b/greenplumpython/__init__.py index 5a2e8b03..666e648e 100644 --- a/greenplumpython/__init__.py +++ b/greenplumpython/__init__.py @@ -6,4 +6,5 @@ from greenplumpython.func import create_column_function # type: ignore from greenplumpython.func import create_function # type: ignore from greenplumpython.func import aggregate_function, function +from greenplumpython.op import operator from greenplumpython.type import type_ diff --git a/greenplumpython/op.py b/greenplumpython/op.py new file mode 100644 index 00000000..6a501990 --- /dev/null +++ b/greenplumpython/op.py @@ -0,0 +1,27 @@ +from typing import Any, Union, Optional +from greenplumpython.expr import BinaryExpr, UnaryExpr + + +class Operator: + def __init__(self, name: str, schema: Optional[str]) -> None: + self._name = name + self._schema = schema + + @property + def qualified_name(self) -> str: + if self._schema is not None: + return f'OPERATOR("{self._schema}".{self._name})' + else: + return f"OPERATOR({self._name})" + + def __call__(self, *args: Any) -> Union[UnaryExpr, BinaryExpr]: + if len(args) == 1: + return UnaryExpr(self.qualified_name, args[0]) + if len(args) == 2: + return BinaryExpr(self.qualified_name, args[0], args[1]) + else: + raise Exception("Too many operands.") + + +def operator(name: str, schema: Optional[str]) -> Operator: + return Operator(name, schema) diff --git a/tests/test_index.py b/tests/test_index.py new file mode 100644 index 00000000..e1ec9249 --- /dev/null +++ b/tests/test_index.py @@ -0,0 +1,57 @@ +import pytest +from tests import db + +import greenplumpython as gp +from typing import List + + +def test_op_on_consts(db: gp.Database): + regex_match = gp.operator("~") + result = db.assign(is_matched=lambda: regex_match("hello", "h.*o")) + assert len(list(result)) == 1 and next(iter(result))["is_matched"] + + +def test_op_index(db: gp.Database): + import json + import dataclasses + + @dataclasses.dataclass + class Student: + name: str + courses: List[str] + + john = Student("john", ["math", "english"]) + jsonb = gp.type_("jsonb") + rows = [(jsonb(json.dumps(john.__dict__)),)] + student = ( + db.create_dataframe(rows=rows, column_names=["info"]) + .save_as("student", temp=True, column_names=["info"]) + .create_index(["info"], "gin") + ) + + db._execute("SET enable_seqscan TO False", has_results=False) + json_contains = gp.operator("@>") + results = student[lambda t: json_contains(t["info"], json.dumps({"name": "john"}))]._explain() + uses_index_scan = False + for row in results: + if "Index Scan" in row["QUERY PLAN"]: + uses_index_scan = True + break + assert uses_index_scan + + +def test_op_with_schema(db: gp.Database): + my_add = gp.operator("+") + result = db.assign(add=lambda: my_add(1, 2)) + for row in result: + assert row["add"] == 3 + wrong_add = gp.operator("+", "pg_catalog") + result = db.assign(add=lambda: wrong_add(1, 2)) + with pytest.raises(Exception) as exc_info: + print(result) + assert "cross-database references are not implemented: pg_catalog.pg_catalog." in str( + exc_info.value + ) + + +# FIXME : Add test for unary operator From 6b04ee3aa44f06b596d834cf9c6a33388061e4f4 Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Sun, 21 May 2023 21:55:25 -0400 Subject: [PATCH 03/28] Code lint --- greenplumpython/op.py | 3 ++- tests/test_index.py | 13 +++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/greenplumpython/op.py b/greenplumpython/op.py index 6a501990..74f481e2 100644 --- a/greenplumpython/op.py +++ b/greenplumpython/op.py @@ -1,4 +1,5 @@ -from typing import Any, Union, Optional +from typing import Any, Optional, Union + from greenplumpython.expr import BinaryExpr, UnaryExpr diff --git a/tests/test_index.py b/tests/test_index.py index e1ec9249..bc6f4fa2 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -1,8 +1,9 @@ +from typing import List + import pytest -from tests import db import greenplumpython as gp -from typing import List +from tests import db def test_op_on_consts(db: gp.Database): @@ -12,8 +13,8 @@ def test_op_on_consts(db: gp.Database): def test_op_index(db: gp.Database): - import json import dataclasses + import json @dataclasses.dataclass class Student: @@ -32,12 +33,12 @@ class Student: db._execute("SET enable_seqscan TO False", has_results=False) json_contains = gp.operator("@>") results = student[lambda t: json_contains(t["info"], json.dumps({"name": "john"}))]._explain() - uses_index_scan = False + using_index_scan = False for row in results: if "Index Scan" in row["QUERY PLAN"]: - uses_index_scan = True + using_index_scan = True break - assert uses_index_scan + assert using_index_scan def test_op_with_schema(db: gp.Database): From fe5baa7b381a35cd06f1e6df36612c3753294b4f Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Tue, 23 May 2023 00:28:31 -0400 Subject: [PATCH 04/28] Fix tests --- greenplumpython/dataframe.py | 18 +++++++++--------- greenplumpython/op.py | 4 ++-- tests/test_index.py | 13 +++++-------- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index e5b07a51..e0c04081 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -857,7 +857,7 @@ def _fetch(self, is_all: bool = True) -> Iterable[Tuple[Any]]: def save_as( self, - table_name: str, + table_name: Optional[str] = None, column_names: List[str] = [], temp: bool = False, storage_params: dict[str, Any] = {}, @@ -918,16 +918,20 @@ def save_as( # build string from parameter dict, such as from {'a': 1, 'b': 2} to # 'WITH (a=1, b=2)' - storage_parameters = ( + storage_params_clause = ( f"WITH ({','.join([f'{key}={storage_params[key]}' for key in storage_params.keys()])})" ) + if table_name is None: + table_name = self._name if not self.is_saved else "cte_" + uuid4().hex qualified_table_name = f'"{table_name}"' if schema is None else f'"{schema}"."{table_name}"' self._db._execute( f""" CREATE {'TEMP' if temp else ''} TABLE {qualified_table_name} ({','.join(column_names)}) - {storage_parameters if storage_params else ''} - AS {self._build_full_query()} + {storage_params_clause if storage_params else ''} + AS ( + {self._build_full_query()} + ) """, has_results=False, ) @@ -946,11 +950,7 @@ def create_index( qualified_table_name = ( f'"{self._schema}"."{self._name}"' if self._schema is not None else f'"{self._name}"' ) - key_cols = [] - for name in columns: - col: Column = self[name] - key_cols.append(col._serialize()) - + key_cols = [f'"{name}"' for name in columns] assert self._db is not None self._db._execute( f'CREATE INDEX "{index_name}" ON {qualified_table_name} USING "{method}" (' diff --git a/greenplumpython/op.py b/greenplumpython/op.py index 74f481e2..2022acfd 100644 --- a/greenplumpython/op.py +++ b/greenplumpython/op.py @@ -4,7 +4,7 @@ class Operator: - def __init__(self, name: str, schema: Optional[str]) -> None: + def __init__(self, name: str, schema: Optional[str] = None) -> None: self._name = name self._schema = schema @@ -24,5 +24,5 @@ def __call__(self, *args: Any) -> Union[UnaryExpr, BinaryExpr]: raise Exception("Too many operands.") -def operator(name: str, schema: Optional[str]) -> Operator: +def operator(name: str, schema: Optional[str] = None) -> Operator: return Operator(name, schema) diff --git a/tests/test_index.py b/tests/test_index.py index bc6f4fa2..131b1839 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -26,7 +26,7 @@ class Student: rows = [(jsonb(json.dumps(john.__dict__)),)] student = ( db.create_dataframe(rows=rows, column_names=["info"]) - .save_as("student", temp=True, column_names=["info"]) + .save_as(temp=True, column_names=["info"]) .create_index(["info"], "gin") ) @@ -46,13 +46,10 @@ def test_op_with_schema(db: gp.Database): result = db.assign(add=lambda: my_add(1, 2)) for row in result: assert row["add"] == 3 - wrong_add = gp.operator("+", "pg_catalog") - result = db.assign(add=lambda: wrong_add(1, 2)) - with pytest.raises(Exception) as exc_info: - print(result) - assert "cross-database references are not implemented: pg_catalog.pg_catalog." in str( - exc_info.value - ) + qualified_add = gp.operator("+", "pg_catalog") + result = db.assign(add=lambda: qualified_add(1, 2)) + for row in result: + assert row["add"] == 3 # FIXME : Add test for unary operator From 8a3bf3fbc29f7e55a5acf04978561b07b9daa839 Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Tue, 23 May 2023 04:24:56 -0400 Subject: [PATCH 05/28] Fix table name --- greenplumpython/dataframe.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index e0c04081..1fb1eb8a 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -68,16 +68,16 @@ def __init__( parents: List["DataFrame"] = [], db: Optional[Database] = None, columns: Optional[Iterable[Column]] = None, - is_saved: bool = False, + qualified_table_name: Optional[str] = None, ) -> None: # FIXME: Add doc # noqa self._query = query self._parents = parents self._name = "cte_" + uuid4().hex + self._qualified_table_name = qualified_table_name self._columns = columns self._contents: Optional[Iterable[RealDictRow]] = None - self._is_saved = is_saved if any(parents): self._db = next(iter(parents))._db else: @@ -85,7 +85,7 @@ def __init__( @property def is_saved(self) -> bool: - return self._is_saved + return self._qualified_table_name is not None @singledispatchmethod def _getitem(self, _) -> "DataFrame": @@ -947,13 +947,10 @@ def create_index( assert len(columns) > 0, "Column set to be indexed cannot be empty." index_name: str = "idx_" + uuid4().hex if name is None else name - qualified_table_name = ( - f'"{self._schema}"."{self._name}"' if self._schema is not None else f'"{self._name}"' - ) key_cols = [f'"{name}"' for name in columns] assert self._db is not None self._db._execute( - f'CREATE INDEX "{index_name}" ON {qualified_table_name} USING "{method}" (' + f'CREATE INDEX "{index_name}" ON {self._qualified_table_name} USING "{method}" (' f' {",".join(key_cols)}' f")", has_results=False, @@ -1048,11 +1045,8 @@ def from_table(cls, table_name: str, db: Database, schema: Optional[str] = None) df = gp.DataFrame.from_table("pg_class", db=db) """ - return DataFrame( - f'TABLE "{schema}"."{table_name}"' if schema is not None else f'TABLE "{table_name}"', - db=db, - is_saved=True, - ) + qualified_name = f'"{schema}"."{table_name}"' if schema is not None else f'"{table_name}"' + return DataFrame(f"TABLE {qualified_name}", db=db, qualified_table_name=qualified_name) @classmethod def from_rows( From c4bfdab63239e6da20c60ce420c0060d989428fb Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Wed, 24 May 2023 05:21:33 -0400 Subject: [PATCH 06/28] Support opcalss --- greenplumpython/dataframe.py | 34 +++++++++++------------------ greenplumpython/pandas/dataframe.py | 2 +- tests/test_index.py | 32 +++++++++++++++++++-------- 3 files changed, 37 insertions(+), 31 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index 1fb1eb8a..d25b73e5 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -85,6 +85,9 @@ def __init__( @property def is_saved(self) -> bool: + """ + Check whether the current dataframe is saved in database. + """ return self._qualified_table_name is not None @singledispatchmethod @@ -710,7 +713,7 @@ def _depth_first_search(self, t: "DataFrame", visited: Set[str], lineage: List[" self._depth_first_search(i, visited, lineage) lineage.append(t) - def _build_full_query(self) -> str: + def _serialize(self) -> str: # noqa """:meta private:""" lineage = self._list_lineage() @@ -852,7 +855,7 @@ def _fetch(self, is_all: bool = True) -> Iterable[Tuple[Any]]: f"SELECT to_json({output_name})::TEXT FROM {self._name} AS {output_name}", parents=[self], ) - result = self._db._execute(to_json_dataframe._build_full_query()) + result = self._db._execute(to_json_dataframe._serialize()) return result if isinstance(result, Iterable) else [] def save_as( @@ -930,7 +933,7 @@ def save_as( ({','.join(column_names)}) {storage_params_clause if storage_params else ''} AS ( - {self._build_full_query()} + {self._serialize()} ) """, has_results=False, @@ -939,7 +942,7 @@ def save_as( def create_index( self, - columns: List[str], + columns: Union[Set[str], Dict[str, str]], method: str = "btree", name: Optional[str] = None, ) -> "DataFrame": @@ -947,31 +950,20 @@ def create_index( assert len(columns) > 0, "Column set to be indexed cannot be empty." index_name: str = "idx_" + uuid4().hex if name is None else name - key_cols = [f'"{name}"' for name in columns] + keys = ( + [f'"{name}" "{op_class}"' for name, op_class in columns.items()] + if isinstance(columns, dict) + else [f'"{name}"' for name in columns] + ) assert self._db is not None self._db._execute( f'CREATE INDEX "{index_name}" ON {self._qualified_table_name} USING "{method}" (' - f' {",".join(key_cols)}' + f' {",".join(keys)}' f")", has_results=False, ) return self - def _explain(self, format: str = "TEXT") -> Iterable[Tuple[str]]: - """ - Explain the GreenplumPython :class:`~dataframe.DataFrame`'s execution plan. - - Args: - format: str: the format of the explain result. It can be one of "TEXT"/"XML"/"JSON"/"YAML". - - Returns: - Iterable[Tuple[str]]: The results of *EXPLAIN* query. - """ - assert self._db is not None - results = self._db._execute(f"EXPLAIN (FORMAT {format}) {self._build_full_query()}") - assert results is not None - return results - def group_by(self, *column_names: str) -> DataFrameGroupingSet: """ Group the current GreenplumPython :class:`~dataframe.DataFrame` by `column_names`. diff --git a/greenplumpython/pandas/dataframe.py b/greenplumpython/pandas/dataframe.py index 629a984c..3b959352 100644 --- a/greenplumpython/pandas/dataframe.py +++ b/greenplumpython/pandas/dataframe.py @@ -95,7 +95,7 @@ def to_sql( assert index is False, "DataFrame in GreenplumPython.pandas does not have an index column" table_name = f'"{name}"' if schema is None else f'"{schema}"."{name}"' database = db.Database(uri=con) - query = self._dataframe._build_full_query() + query = self._dataframe._serialize() if if_exists == "append": rowcount = database._execute( f""" diff --git a/tests/test_index.py b/tests/test_index.py index 131b1839..4cb71bf8 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -12,7 +12,14 @@ def test_op_on_consts(db: gp.Database): assert len(list(result)) == 1 and next(iter(result))["is_matched"] -def test_op_index(db: gp.Database): +def using_index_scan(results: gp.DataFrame, db: gp.Database) -> bool: + for row in db._execute(f"EXPLAIN {results._serialize()}"): + if "Index Scan" in row["QUERY PLAN"] or "Index Only Scan" in row["QUERY PLAN"]: + return True + return False + + +def test_index(db: gp.Database): import dataclasses import json @@ -27,18 +34,25 @@ class Student: student = ( db.create_dataframe(rows=rows, column_names=["info"]) .save_as(temp=True, column_names=["info"]) - .create_index(["info"], "gin") + .create_index({"info"}, "gin") ) db._execute("SET enable_seqscan TO False", has_results=False) json_contains = gp.operator("@>") - results = student[lambda t: json_contains(t["info"], json.dumps({"name": "john"}))]._explain() - using_index_scan = False - for row in results: - if "Index Scan" in row["QUERY PLAN"]: - using_index_scan = True - break - assert using_index_scan + assert using_index_scan( + student[lambda t: json_contains(t["info"], json.dumps({"name": "john"}))], db + ) + + +def test_index_opclass(db: gp.Database): + df = ( + db.create_dataframe(columns={"text": ["hello", "world"]}) + .save_as(temp=True, column_names=["text"]) + .create_index(columns={"text": "text_pattern_ops"}) + ) + db._execute("SET enable_seqscan TO off", has_results=False) + db._execute("SET optimizer TO off", has_results=False) + assert using_index_scan(df[lambda t: t["text"] > "hello"], db) def test_op_with_schema(db: gp.Database): From 79431808cee8e36bf492969afd584a2cde476dd1 Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Wed, 24 May 2023 05:47:46 -0400 Subject: [PATCH 07/28] Adapt to PG --- tests/test_index.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/test_index.py b/tests/test_index.py index 4cb71bf8..be95c6bc 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -51,7 +51,14 @@ def test_index_opclass(db: gp.Database): .create_index(columns={"text": "text_pattern_ops"}) ) db._execute("SET enable_seqscan TO off", has_results=False) - db._execute("SET optimizer TO off", has_results=False) + db._execute(""" + do $$ + begin + set optimizer to off; + exception when others then + end; + $$; + """, has_results=False) assert using_index_scan(df[lambda t: t["text"] > "hello"], db) From 829182c56572968fd4caf66aa95e757697a5a79c Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Wed, 24 May 2023 05:54:18 -0400 Subject: [PATCH 08/28] Add doc --- greenplumpython/dataframe.py | 13 +++++++++++++ tests/test_index.py | 7 +++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index d25b73e5..dd04e966 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -946,6 +946,19 @@ def create_index( method: str = "btree", name: Optional[str] = None, ) -> "DataFrame": + """ + Create an index for the current dataframe for fast searching. + + The current dataframe is required to be saved before creating index. + + Args: + columns: key columns of the current dataframe to create index on. + method: index access method. + name: name of the index. + + Returns: + Dataframe with key columns indexed. + """ assert self.is_saved, "Cannot create index for unsaved dataframe." assert len(columns) > 0, "Column set to be indexed cannot be empty." diff --git a/tests/test_index.py b/tests/test_index.py index be95c6bc..e86e7cab 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -51,14 +51,17 @@ def test_index_opclass(db: gp.Database): .create_index(columns={"text": "text_pattern_ops"}) ) db._execute("SET enable_seqscan TO off", has_results=False) - db._execute(""" + db._execute( + """ do $$ begin set optimizer to off; exception when others then end; $$; - """, has_results=False) + """, + has_results=False, + ) assert using_index_scan(df[lambda t: t["text"] > "hello"], db) From 6bd3c4827e84fbeebc72462129cc2153f09ce6e1 Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Wed, 24 May 2023 05:56:06 -0400 Subject: [PATCH 09/28] Format --- greenplumpython/dataframe.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index dd04e966..18c5d3fa 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -85,9 +85,7 @@ def __init__( @property def is_saved(self) -> bool: - """ - Check whether the current dataframe is saved in database. - """ + """Check whether the current dataframe is saved in database.""" return self._qualified_table_name is not None @singledispatchmethod From 3ad5598b00cc6b7ab8b6567ce57e580cd32fdc28 Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Tue, 30 May 2023 04:49:42 -0400 Subject: [PATCH 10/28] Add doc for operators (WIP) --- doc/source/op.rst | 8 ++++++++ greenplumpython/op.py | 3 +++ 2 files changed, 11 insertions(+) create mode 100644 doc/source/op.rst diff --git a/doc/source/op.rst b/doc/source/op.rst new file mode 100644 index 00000000..158d21b8 --- /dev/null +++ b/doc/source/op.rst @@ -0,0 +1,8 @@ +Operators and Indexing +======== +.. module:: greenplumpython + +.. automodule:: op + :members: + :show-inheritance: + :member-order: bysource \ No newline at end of file diff --git a/greenplumpython/op.py b/greenplumpython/op.py index 2022acfd..7f33337e 100644 --- a/greenplumpython/op.py +++ b/greenplumpython/op.py @@ -1,3 +1,6 @@ +""" +Indexing is essential for fast data searching in database. Unlike pandas, +""" from typing import Any, Optional, Union from greenplumpython.expr import BinaryExpr, UnaryExpr From ac2feb63f74d649e2df38779f77c046aab53759f Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Tue, 30 May 2023 04:51:36 -0400 Subject: [PATCH 11/28] Fix issue of missing db (again) --- greenplumpython/expr.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/greenplumpython/expr.py b/greenplumpython/expr.py index c851cab8..a229b1a3 100644 --- a/greenplumpython/expr.py +++ b/greenplumpython/expr.py @@ -569,9 +569,13 @@ def _init( if other_dataframe is not None and isinstance(right, Expr): other_dataframe = right._other_dataframe super().__init__(dataframe=dataframe, other_dataframe=other_dataframe) - self.operator = operator - self.left = left - self.right = right + self._operator = operator + self._left = left + if self._left._db is None: + self._left._db = self._db + self._right = right + if self._right._db is None: + self._right._db = self._db @overload def __init__( @@ -639,11 +643,13 @@ def __init__( (right._dataframe, right._other_dataframe) if isinstance(right, Expr) else (None, None) ) super().__init__(dataframe=dataframe, other_dataframe=other_dataframe) - self.operator = operator - self.right = right + self._operator = operator + self._right = right + if self._right._db is None: + self._right._db = self._db def _serialize(self) -> str: - right_str = str(self.right) + right_str = str(self._right) return f"{self.operator} ({right_str})" From 8aecbc69af7565cacc8425d41bca314d43fd1386 Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Tue, 30 May 2023 05:09:54 -0400 Subject: [PATCH 12/28] Fix field name --- greenplumpython/expr.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/greenplumpython/expr.py b/greenplumpython/expr.py index a229b1a3..74ef69e8 100644 --- a/greenplumpython/expr.py +++ b/greenplumpython/expr.py @@ -625,8 +625,8 @@ def __init__( def _serialize(self) -> str: from greenplumpython.expr import _serialize - left_str = _serialize(self.left) - right_str = _serialize(self.right) + left_str = _serialize(self._left) + right_str = _serialize(self._right) return f"({left_str} {self.operator} {right_str})" From 7453367d7f548216a1df61ff342568c47dbc9d9e Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Tue, 30 May 2023 21:55:16 -0400 Subject: [PATCH 13/28] Assign db when serialize() --- greenplumpython/expr.py | 19 +++++++++++-------- greenplumpython/type.py | 2 ++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/greenplumpython/expr.py b/greenplumpython/expr.py index 74ef69e8..2a589218 100644 --- a/greenplumpython/expr.py +++ b/greenplumpython/expr.py @@ -571,11 +571,7 @@ def _init( super().__init__(dataframe=dataframe, other_dataframe=other_dataframe) self._operator = operator self._left = left - if self._left._db is None: - self._left._db = self._db self._right = right - if self._right._db is None: - self._right._db = self._db @overload def __init__( @@ -625,6 +621,10 @@ def __init__( def _serialize(self) -> str: from greenplumpython.expr import _serialize + if isinstance(self._left._db, Expr): + self._left._db = self._db + if isinstance(self._right._db, Expr): + self._right._db = self._db left_str = _serialize(self._left) right_str = _serialize(self._right) return f"({left_str} {self.operator} {right_str})" @@ -645,12 +645,14 @@ def __init__( super().__init__(dataframe=dataframe, other_dataframe=other_dataframe) self._operator = operator self._right = right - if self._right._db is None: - self._right._db = self._db def _serialize(self) -> str: - right_str = str(self._right) - return f"{self.operator} ({right_str})" + from greenplumpython.expr import _serialize + + if isinstance(self._right._db, Expr): + self._right._db = self._db + right_str = _serialize(self._right) + return f"({self.operator} ({right_str}))" class InExpr(Expr): @@ -677,6 +679,7 @@ def _serialize(self) -> str: # when combining with `~` (bitwise not) operator. container_name: str = "cte_" + uuid4().hex if isinstance(self._container, Expr) and self._other_dataframe is not None: + self._container._db = self._db return ( f"(EXISTS (SELECT FROM {self._other_dataframe._name}" f" WHERE ({self._container._serialize()} = {self._item._serialize()})))" diff --git a/greenplumpython/type.py b/greenplumpython/type.py index 0a1696ef..9d120610 100644 --- a/greenplumpython/type.py +++ b/greenplumpython/type.py @@ -52,6 +52,8 @@ def __init__( ) def _serialize(self) -> str: + if isinstance(self._obj, Expr): + self._obj._db = self._db obj_str = _serialize(self._obj) return f"({obj_str}::{self._qualified_name_str})" From a2d98c6cd891b8d4605db95dc02e7d2f25ecedbc Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Tue, 30 May 2023 22:28:08 -0400 Subject: [PATCH 14/28] Support type modifiers --- greenplumpython/expr.py | 8 +++---- greenplumpython/type.py | 48 +++++++++++++++-------------------------- 2 files changed, 21 insertions(+), 35 deletions(-) diff --git a/greenplumpython/expr.py b/greenplumpython/expr.py index 2a589218..021e7131 100644 --- a/greenplumpython/expr.py +++ b/greenplumpython/expr.py @@ -621,13 +621,13 @@ def __init__( def _serialize(self) -> str: from greenplumpython.expr import _serialize - if isinstance(self._left._db, Expr): + if isinstance(self._left, Expr): self._left._db = self._db - if isinstance(self._right._db, Expr): + if isinstance(self._right, Expr): self._right._db = self._db left_str = _serialize(self._left) right_str = _serialize(self._right) - return f"({left_str} {self.operator} {right_str})" + return f"({left_str} {self._operator} {right_str})" class UnaryExpr(Expr): @@ -652,7 +652,7 @@ def _serialize(self) -> str: if isinstance(self._right._db, Expr): self._right._db = self._db right_str = _serialize(self._right) - return f"({self.operator} ({right_str}))" + return f"({self._operator} ({right_str}))" class InExpr(Expr): diff --git a/greenplumpython/type.py b/greenplumpython/type.py index 9d120610..4494a275 100644 --- a/greenplumpython/type.py +++ b/greenplumpython/type.py @@ -28,12 +28,7 @@ class TypeCast(Expr): (2 rows) """ - def __init__( - self, - obj: object, - type_name: str, - schema: Optional[str] = None, - ) -> None: + def __init__(self, obj: object, qualified_type_name: str) -> None: # noqa: D205 D400 """ Args: @@ -43,29 +38,13 @@ def __init__( dataframe = obj._dataframe if isinstance(obj, Expr) else None super().__init__(dataframe) self._obj = obj - self._type_name = type_name - self._schema = schema - self._qualified_name_str = ( - f'"{self._type_name}"' - if self._schema is None - else f'"{self._schema}"."{self._type_name}"' - ) + self._qualified_type_name = qualified_type_name def _serialize(self) -> str: if isinstance(self._obj, Expr): self._obj._db = self._db obj_str = _serialize(self._obj) - return f"({obj_str}::{self._qualified_name_str})" - - @property - def _qualified_name(self) -> Tuple[Optional[str], str]: - """ - Return the schema name and name of :class:`~type.TypeCast`. - - Returns: - Tuple[str, str]: schema name and :class:`~type.TypeCast`'s name. - """ - return self._schema, self._type_name + return f"({obj_str}::{self._qualified_type_name})" class Type: @@ -86,16 +65,23 @@ class Type: """ def __init__( - self, name: str, annotation: Optional[type] = None, schema: Optional[str] = None + self, + name: str, + annotation: Optional[type] = None, + schema: Optional[str] = None, + modifier: Optional[int] = None, ) -> None: # noqa: D107 self._name = name self._annotation = annotation self._created_in_dbs: Optional[Set[Database]] = set() if annotation is not None else None self._schema = schema - self._qualified_name_str = ( - f'"{self._name}"' if self._schema is None else f'"{self._schema}"."{self._name}"' - ) + self._modifier = modifier + self._qualified_name_str = f'"{self._name}"' + if self._schema is not None: + self._qualified_name_str = f'"{self._schema}".' + self._qualified_name_str + if self._modifier is not None: + self._qualified_name_str += self._qualified_name_str + f"({self._modifier})" # -- Creation of a composite type in Greenplum corresponding to the class_type given def _create_in_db(self, db: Database): @@ -140,7 +126,7 @@ def __call__(self, obj: Any) -> TypeCast: - Any :class:`Expr` consisting of adaptable Python objects and :class:`Column`s of a :class:`DataFrame`. """ - return TypeCast(obj, self._name, self._schema) + return TypeCast(obj, self._qualified_name_str) @property def _qualified_name(self) -> Tuple[Optional[str], str]: @@ -164,7 +150,7 @@ def _qualified_name(self) -> Tuple[Optional[str], str]: } -def type_(name: str, schema: Optional[str] = None) -> Type: +def type_(name: str, schema: Optional[str] = None, modifier: Optional[int] = None) -> Type: """ Get access to a type predefined in database. @@ -175,7 +161,7 @@ def type_(name: str, schema: Optional[str] = None) -> Type: Returns: The predefined type as a :class:`~type.Type` object. """ - return Type(name, schema=schema) + return Type(name, schema=schema, modifier=modifier) def to_pg_type( From 2ab6dda2a5be6f76bbc93f24163300d641a3e674 Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Tue, 30 May 2023 22:35:57 -0400 Subject: [PATCH 15/28] Fix type name error --- greenplumpython/type.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/greenplumpython/type.py b/greenplumpython/type.py index 4494a275..e5c95996 100644 --- a/greenplumpython/type.py +++ b/greenplumpython/type.py @@ -81,7 +81,7 @@ def __init__( if self._schema is not None: self._qualified_name_str = f'"{self._schema}".' + self._qualified_name_str if self._modifier is not None: - self._qualified_name_str += self._qualified_name_str + f"({self._modifier})" + self._qualified_name_str += f"({self._modifier})" # -- Creation of a composite type in Greenplum corresponding to the class_type given def _create_in_db(self, db: Database): From cefe239064658956a43d7647c01e6b9c8eceb3a1 Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Tue, 30 May 2023 23:06:43 -0400 Subject: [PATCH 16/28] Bind to db from outside --- greenplumpython/dataframe.py | 1 + greenplumpython/db.py | 1 + greenplumpython/expr.py | 6 +++++- greenplumpython/func.py | 2 ++ greenplumpython/group.py | 9 +++++---- 5 files changed, 14 insertions(+), 5 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index 18c5d3fa..ae646f44 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -475,6 +475,7 @@ def assign(self, **new_columns: Callable[["DataFrame"], Any]) -> "DataFrame": for k, f in new_columns.items(): v: Any = f(self) if isinstance(v, Expr): + v.bind(db=self._db) assert ( v._dataframe is None or v._dataframe == self ), "Newly included columns must be based on the current dataframe" diff --git a/greenplumpython/db.py b/greenplumpython/db.py index 324d5c3e..f3cb42ea 100644 --- a/greenplumpython/db.py +++ b/greenplumpython/db.py @@ -213,6 +213,7 @@ def assign(self, **new_columns: Callable[[], Any]) -> "DataFrame": for k, f in new_columns.items(): v: Any = f() if isinstance(v, Expr): + v.bind(db=self) assert v._dataframe is None, "New column should not depend on any dataframe." if isinstance(v, FunctionExpr): v = v.bind(db=self) diff --git a/greenplumpython/expr.py b/greenplumpython/expr.py index 021e7131..04ea55c0 100644 --- a/greenplumpython/expr.py +++ b/greenplumpython/expr.py @@ -20,7 +20,11 @@ def __init__( # noqa: D107 self._dataframe = dataframe self._other_dataframe = other_dataframe - self._db = dataframe._db if dataframe is not None else None + self._db = dataframe._db if dataframe is not None else None # FIXME: set it to None + + def bind(self, db: Database) -> "Expr": + self._db = db + return self def __hash__(self) -> int: # noqa: D105 diff --git a/greenplumpython/func.py b/greenplumpython/func.py index 92f531a0..bf7b8aa2 100644 --- a/greenplumpython/func.py +++ b/greenplumpython/func.py @@ -83,6 +83,7 @@ def bind( def _serialize(self) -> str: # noqa D400 """:meta private:""" + assert self._db is not None, "Database is required to create function." self._function._create_in_db(self._db) distinct = "DISTINCT" if self._distinct else "" for arg in self._args: @@ -308,6 +309,7 @@ def _create_in_db(self, db: Database) -> None: func_src: str = inspect.getsource(self._wrapped_func) else: func_src: str = dill.source.getsource(self._wrapped_func) + assert isinstance(func_src, str) func_ast: ast.FunctionDef = ast.parse(dedent(func_src)).body[0] # TODO: Lambda expressions are NOT supported since inspect.signature() # does not work as expected. diff --git a/greenplumpython/group.py b/greenplumpython/group.py index b061a81c..0882022e 100644 --- a/greenplumpython/group.py +++ b/greenplumpython/group.py @@ -158,10 +158,11 @@ def assign(self, **new_columns: Callable[["DataFrame"], Any]) -> "DataFrame": targets: List[str] = self._flatten() for k, f in new_columns.items(): v: Any = f(self._dataframe).bind(group_by=self) - if isinstance(v, Expr) and not ( - v._dataframe is None or v._dataframe == self._dataframe - ): - raise Exception("Newly included columns must be based on the current dataframe") + if isinstance(v, Expr): + assert ( + v._dataframe is None or v._dataframe == self._dataframe + ), "Newly included columns must be based on the current dataframe" + v.bind(db=self._dataframe._db) targets.append(f"{_serialize(v)} AS {k}") return DataFrame( f"SELECT {','.join(targets)} FROM {self._dataframe._name} {self._clause()}", From 61b1c58d94defee1690d43f5205124babf4df0ae Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Wed, 31 May 2023 12:07:26 +0200 Subject: [PATCH 17/28] Rebase feat_index --- greenplumpython/dataframe.py | 9 ++++++++- greenplumpython/func.py | 8 +++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index ae646f44..e51d7948 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -341,6 +341,7 @@ def apply( func: Callable[["DataFrame"], "FunctionExpr"], expand: bool = False, column_name: Optional[str] = None, + row_id: Optional[str] = None, ) -> "DataFrame": """ Apply a dataframe function to the self :class:`~dataframe.DataFrame`. @@ -428,7 +429,11 @@ def apply( # # To fix this, we need to pass the dataframe to the resulting FunctionExpr # explicitly. - return func(self).bind(dataframe=self).apply(expand=expand, column_name=column_name) + return ( + func(self) + .bind(dataframe=self) + .apply(expand=expand, column_name=column_name, row_id=row_id) + ) def assign(self, **new_columns: Callable[["DataFrame"], Any]) -> "DataFrame": """ @@ -864,6 +869,7 @@ def save_as( temp: bool = False, storage_params: dict[str, Any] = {}, schema: Optional[str] = None, + distribution_key: Optional[List[str]] = None, ) -> "DataFrame": """ Save the GreenplumPython :class:`~dataframe.Dataframe` as a *table* into the database. @@ -934,6 +940,7 @@ def save_as( AS ( {self._serialize()} ) + DISTRIBUTED {f"BY ({','.join(distribution_key)})" if distribution_key is not None else "RANDOMLY"} """, has_results=False, ) diff --git a/greenplumpython/func.py b/greenplumpython/func.py index bf7b8aa2..69d7ce30 100644 --- a/greenplumpython/func.py +++ b/greenplumpython/func.py @@ -96,7 +96,9 @@ def _serialize(self) -> str: ) return f"{self._function._qualified_name_str}({distinct} {args_string})" - def apply(self, expand: bool = False, column_name: Optional[str] = None) -> DataFrame: + def apply( + self, expand: bool = False, column_name: Optional[str] = None, row_id: Optional[str] = None + ) -> DataFrame: # noqa D400 """ :meta private: @@ -126,7 +128,7 @@ def apply(self, expand: bool = False, column_name: Optional[str] = None) -> Data orig_func_dataframe = DataFrame( " ".join( [ - f"SELECT {str(self)} {'AS ' + column_name if column_name is not None else ''}", + f"SELECT {(row_id + ',') if row_id is not None else ''} {str(self)} {'AS ' + column_name if column_name is not None else ''}", ("," + ",".join(grouping_cols)) if (grouping_cols is not None) else "", from_clause, group_by_clause, @@ -164,7 +166,7 @@ def apply(self, expand: bool = False, column_name: Optional[str] = None) -> Data ) return DataFrame( - f"SELECT {str(results)} FROM {orig_func_dataframe._name}", + f"SELECT {(row_id + ',') if row_id is not None and expand else ''} {str(results)} FROM {orig_func_dataframe._name}", db=self._db, parents=[orig_func_dataframe], ) From 23842a683c1fa3afe90a1fafbdb9b3be25661897 Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Fri, 9 Jun 2023 14:27:01 +0200 Subject: [PATCH 18/28] Enable Join using on handle common columns --- greenplumpython/dataframe.py | 72 ++++++++++++++++++++++++++++- greenplumpython/func.py | 2 +- tests/test_join.py | 90 ++++++++++++++++++++++-------------- 3 files changed, 128 insertions(+), 36 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index e51d7948..b1300dac 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -552,6 +552,7 @@ def join( on: Optional[Union[str, Iterable[str]]] = None, self_columns: Union[Dict[str, Optional[str]], Set[str]] = {"*"}, other_columns: Union[Dict[str, Optional[str]], Set[str]] = {"*"}, + on_columns: Union[Dict[str, Optional[str]], Set[str]] = {"*"}, ) -> "DataFrame": """ Join the current :class:`~dataframe.DataFrame` with another using the given arguments. @@ -577,6 +578,7 @@ def join( the corresponding key to avoid name conflicts. Asterisk :code:`"*"` can be used as a key to indicate all columns. other_columns: Same as `self_columns`, but for the **other** :class:`~dataframe.DataFrame`. + on_columns: A :class:`dict` whose keys are the column names of the resulting joined dataframe using `on` Note: When using :code:`"*"` as key in `self_columns` or `other_columns`, @@ -639,15 +641,83 @@ def bind(t: DataFrame, columns: Union[Dict[str, Optional[str]], Set[str]]) -> Li if on is not None else None ) + # USING clause in SQL uses argument `on`. sql_using_clause = f"USING ({join_column_names})" if join_column_names is not None else "" - return DataFrame( + + if on is None: + return DataFrame( + f""" + SELECT {",".join(target_list)} + FROM {self._name} {how} JOIN {other_clause} {sql_on_clause} {sql_using_clause} + """, + parents=[self, other], + ) + + def bind_using( + t: DataFrame, + columns: Union[Dict[str, Optional[str]], Set[str]], + on: Iterable[str], + suffix: str + ) -> List[str]: + target_list: List[str] = [] + for k in columns: + col: Column = t[k] + v = columns[k] if isinstance(columns, dict) else (k + suffix) if k in on else None + target_list.append(col._serialize() + (f' AS "{v}"' if v is not None else "")) + return target_list + + self_target_list = ( + bind_using(self, self_columns, on, "_l") + if isinstance(self_columns, set) + else bind(self, self_columns) + ) + other_target_list = ( + bind_using(other_temp, other_columns, on, "_r") + if isinstance(other_columns, set) + else bind(other_temp, other_columns) + ) + target_list = self_target_list + other_target_list + + join_dataframe = DataFrame( f""" SELECT {",".join(target_list)} FROM {self._name} {how} JOIN {other_clause} {sql_on_clause} {sql_using_clause} """, parents=[self, other], ) + coalesce_target_list = [] + if not (self_columns == {} or other_columns == {}): + for k in on: + s_v = self_columns[k] if isinstance(self_columns, dict) else (k + "_l") + o_v = other_columns[k] if isinstance(other_columns, dict) else (k + "_r") + coalesce_target_list.append(f"COALESCE({s_v},{o_v}) AS {k}") + + join_df = DataFrame( + f""" + SELECT * {("," + ",".join(coalesce_target_list)) if coalesce_target_list != [] else ""} + FROM {join_dataframe._name} + """, + parents=[join_dataframe], + ) + + self_columns_set = ( + self_columns + if isinstance(self_columns, set) + else set([k if k in on else v for k, v in self_columns.items()]) + ) + other_columns_set = ( + other_columns + if isinstance(other_columns, set) + else set([k if k in on else v for k, v in other_columns.items()]) + ) + return DataFrame( + f""" + SELECT {",".join(sorted(self_columns_set | other_columns_set))} + FROM {join_df._name} + """, + parents=[join_df], + ) inner_join = partialmethod(join, how="INNER") """ diff --git a/greenplumpython/func.py b/greenplumpython/func.py index 69d7ce30..c5b59cef 100644 --- a/greenplumpython/func.py +++ b/greenplumpython/func.py @@ -97,7 +97,7 @@ def _serialize(self) -> str: return f"{self._function._qualified_name_str}({distinct} {args_string})" def apply( - self, expand: bool = False, column_name: Optional[str] = None, row_id: Optional[str] = None + self, expand: bool = False, column_name: Optional[str] = None, row_id: Optional[str] = None ) -> DataFrame: # noqa D400 """ diff --git a/tests/test_join.py b/tests/test_join.py index f8ab5a50..13db37d9 100644 --- a/tests/test_join.py +++ b/tests/test_join.py @@ -103,8 +103,8 @@ def test_join_same_column_using(db: gp.Database): rows = [(1,), (2,), (3,)] t1 = db.create_dataframe(rows=rows, column_names=["id"]) t2 = db.create_dataframe(rows=rows, column_names=["id"]) - ret = t1.join(t2, on=["id"], self_columns={"id": "t1_id"}, other_columns={"id": "t2_id"}) - assert sorted(next(iter(ret)).keys()) == sorted(["t1_id", "t2_id"]) + ret = t1.join(t2, on=["id"], self_columns={"id"}, other_columns={"id"}) + assert sorted(next(iter(ret)).keys()) == ["id"] def test_join_same_column_names(db: gp.Database): @@ -121,21 +121,21 @@ def test_join_on_multi_columns(db: gp.Database): rows = [(1, 1), (2, 1), (3, 1)] t1 = db.create_dataframe(rows=rows, column_names=["id", "n"]) t2 = db.create_dataframe(rows=rows, column_names=["id", "n"]) - ret = t1.join(t2, on=["id", "n"], other_columns={}) - print(ret) + ret = t1.join(t2, on=["id", "n"], self_columns={"id", "n"}, other_columns={"id", "n"}) + assert sorted(next(iter(ret)).keys()) == sorted(["id", "n"]) def test_dataframe_inner_join(db: gp.Database, zoo_1: gp.DataFrame, zoo_2: gp.DataFrame): ret: gp.DataFrame = zoo_1.join( zoo_2, on=["animal"], - self_columns={"animal": "zoo1_animal", "id": "zoo1_id"}, - other_columns={"animal": "zoo2_animal", "id": "zoo2_id"}, + self_columns={"animal": "animal_l", "id": "id_zoo1"}, + other_columns={"animal": "animal_r", "id": "id_zoo2"}, ) assert len(list(ret)) == 2 + assert sorted(next(iter(ret)).keys()) == sorted(["animal", "id_zoo1", "id_zoo2"]) for row in ret: - assert row["zoo1_animal"] == row["zoo2_animal"] - assert row["zoo1_animal"] == "Lion" or row["zoo1_animal"] == "Tiger" + assert row["animal"] == "Lion" or row["animal"] == "Tiger" def test_dataframe_left_join(db: gp.Database, zoo_1: gp.DataFrame, zoo_2: gp.DataFrame): @@ -147,10 +147,7 @@ def test_dataframe_left_join(db: gp.Database, zoo_1: gp.DataFrame, zoo_2: gp.Dat ) assert len(list(ret)) == 4 for row in ret: - if row["zoo1_animal"] == "Lion" or row["zoo1_animal"] == "Tiger": - assert row["zoo1_animal"] == row["zoo2_animal"] - else: - assert row["zoo2_animal"] is None + if row["animal"] != "Lion" and row["animal"] != "Tiger": assert row["zoo2_id"] is None @@ -163,10 +160,7 @@ def test_dataframe_right_join(db: gp.Database, zoo_1: gp.DataFrame, zoo_2: gp.Da ) assert len(list(ret)) == 4 for row in ret: - if row["zoo2_animal"] == "Lion" or row["zoo2_animal"] == "Tiger": - assert row["zoo1_animal"] == row["zoo2_animal"] - else: - assert row["zoo1_animal"] is None + if row["animal"] != "Lion" and row["animal"] != "Tiger": assert row["zoo1_id"] is None @@ -179,17 +173,42 @@ def test_dataframe_full_join(db: gp.Database, zoo_1: gp.DataFrame, zoo_2: gp.Dat ) assert len(list(ret)) == 6 for row in ret: - if row["zoo2_animal"] == "Lion" or row["zoo2_animal"] == "Tiger": - assert row["zoo1_animal"] == row["zoo2_animal"] - else: - assert (row["zoo1_animal"] is None and row["zoo2_animal"] is not None) or ( - row["zoo1_animal"] is not None and row["zoo2_animal"] is None - ) + if row["animal"] != "Lion" and row["animal"] != "Tiger": assert (row["zoo1_id"] is None and row["zoo2_id"] is not None) or ( row["zoo1_id"] is not None and row["zoo2_id"] is None ) +def test_dataframe_full_join_with_empty(db: gp.Database): + # fmt: off + rows1 = [(1, 100,), (2, 200,), (3, 300,), (4, 400,)] + rows2 = [(3, 300, 3000,), (4, 400, 4000,), (5, 500, 5000,), (6, 600, 6000)] + # fmt: on + l_df = db.create_dataframe(rows=rows1, column_names=["a", "b"]) + r_df = db.create_dataframe(rows=rows2, column_names=["a", "b", "c"]) + ret = l_df.full_join( + r_df, + self_columns={"a", "b"}, + other_columns={"a", "b", "c"}, + on=["a", "b"], + ).order_by("a")[:] + assert len(list(ret)) == 6 + expected = ( + "----------------\n" + " a | b | c \n" + "---+-----+------\n" + " 1 | 100 | \n" + " 2 | 200 | \n" + " 3 | 300 | 3000 \n" + " 4 | 400 | 4000 \n" + " 5 | 500 | 5000 \n" + " 6 | 600 | 6000 \n" + "----------------\n" + "(6 rows)\n" + ) + assert str(ret) == expected + + def test_join_natural(db: gp.Database): # fmt: off rows1 = [("Smart Phone", 1,), ("Laptop", 2,), ("DataFramet", 3,)] @@ -202,8 +221,8 @@ def test_join_natural(db: gp.Database): ret = categories.join( products, on=["category_id"], - self_columns={"category_name", "category_id"}, - other_columns={"product_name"}, + self_columns={"category_id", "category_name"}, + other_columns={"category_id", "product_name"}, ) assert len(list(ret)) == 6 assert sorted(next(iter(ret)).keys()) == sorted( @@ -246,8 +265,6 @@ def test_dataframe_self_join(db: gp.Database, zoo_1: gp.DataFrame): other_columns={"animal": "zoo2_animal", "id": "zoo2_id"}, ) assert len(list(ret)) == 4 - for row in ret: - assert row["zoo1_animal"] == row["zoo2_animal"] def test_dataframe_self_join_cond(db: gp.Database, zoo_1: gp.DataFrame): @@ -271,20 +288,17 @@ def test_dataframe_join_save(db: gp.Database, zoo_1: gp.DataFrame): ) t_join.save_as( "dataframe_join", - column_names=["zoo1_animal", "zoo1_id", "zoo2_animal", "zoo2_id"], + column_names=["animal", "zoo1_id", "zoo2_id"], temp=True, ) t_join_reload = gp.DataFrame.from_table("dataframe_join", db=db) assert sorted(next(iter(t_join_reload)).keys()) == sorted( [ - "zoo1_animal", + "animal", "zoo1_id", - "zoo2_animal", "zoo2_id", ] ) - for row in t_join_reload: - assert row["zoo1_animal"] == row["zoo2_animal"] def test_dataframe_join_ine(db: gp.Database): @@ -307,11 +321,19 @@ def test_dataframe_multiple_self_join(db: gp.Database, zoo_1: gp.DataFrame): ) ret = t_join.join( zoo_1, - cond=lambda s, o: s["zoo1_animal"] == o["animal"], + on=["animal"], + self_columns={"animal", "zoo1_id", "zoo2_id"}, + other_columns={"animal", "id"}, ) assert len(list(ret)) == 4 - for row in ret: - assert row["zoo2_animal"] == row["animal"] + assert sorted(next(iter(ret)).keys()) == sorted( + [ + "animal", + "id", + "zoo1_id", + "zoo2_id", + ] + ) # This test case is to guarantee that the CTEs are generated in the reversed From bce63e3d0f9544b2ba046ea2dfc2824d0035de40 Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Fri, 9 Jun 2023 15:56:30 +0200 Subject: [PATCH 19/28] Fix testcase for GPDB --- doc/source/modules.rst | 1 + doc/source/op.rst | 3 ++- greenplumpython/dataframe.py | 25 +++++++++++++------------ tests/test_schema.py | 3 ++- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/doc/source/modules.rst b/doc/source/modules.rst index d45df916..9a702759 100644 --- a/doc/source/modules.rst +++ b/doc/source/modules.rst @@ -16,5 +16,6 @@ The **GreenplumPython** library contains 5 main modules: type group order + op pd_df config \ No newline at end of file diff --git a/doc/source/op.rst b/doc/source/op.rst index 158d21b8..bc65240a 100644 --- a/doc/source/op.rst +++ b/doc/source/op.rst @@ -1,5 +1,6 @@ Operators and Indexing -======== +====================== + .. module:: greenplumpython .. automodule:: op diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index b1300dac..58d07a28 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -549,7 +549,7 @@ def join( other: "DataFrame", how: Literal["", "left", "right", "outer", "inner", "cross"] = "", cond: Optional[Callable[["DataFrame", "DataFrame"], Expr]] = None, - on: Optional[Union[str, Iterable[str]]] = None, + on: Iterable[str] = None, self_columns: Union[Dict[str, Optional[str]], Set[str]] = {"*"}, other_columns: Union[Dict[str, Optional[str]], Set[str]] = {"*"}, on_columns: Union[Dict[str, Optional[str]], Set[str]] = {"*"}, @@ -595,20 +595,21 @@ def join( ... age_rows, column_names=["name", "age"], db=db) >>> result = student.join( ... student, - ... on="age", - ... self_columns={"*"}, - ... other_columns={"name": "name_2"}) + ... on=["age"], + ... self_columns={"name": "name", "age": "age_1"}, + ... other_columns={"name": "name_2", "age": "age_2"}) >>> result ---------------------- - name | age | name_2 - -------+-----+-------- - alice | 18 | alice - bob | 19 | carol - bob | 19 | bob - carol | 19 | carol - carol | 19 | bob + age | name | name_2 + -----+-------+-------- + 18 | alice | alice + 19 | bob | carol + 19 | bob | bob + 19 | carol | carol + 19 | carol | bob ---------------------- (5 rows) + """ # FIXME : Raise Error if target columns don't exist assert how.upper() in [ @@ -658,7 +659,7 @@ def bind_using( t: DataFrame, columns: Union[Dict[str, Optional[str]], Set[str]], on: Iterable[str], - suffix: str + suffix: str, ) -> List[str]: target_list: List[str] = [] for k in columns: diff --git a/tests/test_schema.py b/tests/test_schema.py index 863a5307..89f1c43d 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -47,7 +47,8 @@ def test_schema_self_join_on(db: gp.Database, t: gp.DataFrame): ret: gp.DataFrame = t.join( t, on=["id"], - other_columns={"id": "id_1"}, + self_columns={"id"}, + other_columns={"id"}, ) assert len(list(ret)) == 10 From 04e49d0bb27ed130196345f4089e79e545efa619 Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Fri, 9 Jun 2023 17:26:40 +0200 Subject: [PATCH 20/28] Fix df.apply(lambda t: type(func(t))) --- greenplumpython/expr.py | 7 ++++++- greenplumpython/type.py | 36 +++++++++++++++++++++++++++++++++++- tests/test_type.py | 16 ++++++++++++++++ 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/greenplumpython/expr.py b/greenplumpython/expr.py index 04ea55c0..44f5d8aa 100644 --- a/greenplumpython/expr.py +++ b/greenplumpython/expr.py @@ -22,8 +22,13 @@ def __init__( self._other_dataframe = other_dataframe self._db = dataframe._db if dataframe is not None else None # FIXME: set it to None - def bind(self, db: Database) -> "Expr": + def bind( + self, + dataframe: Optional["DataFrame"] = None, + db: Optional[Database] = None, + ) -> "Expr": self._db = db + self._dataframe = dataframe return self def __hash__(self) -> int: diff --git a/greenplumpython/type.py b/greenplumpython/type.py index e5c95996..dff11241 100644 --- a/greenplumpython/type.py +++ b/greenplumpython/type.py @@ -1,10 +1,13 @@ # noqa: D100 -from typing import Any, Dict, List, Optional, Set, Tuple, get_type_hints +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, get_type_hints from uuid import uuid4 from greenplumpython.db import Database from greenplumpython.expr import Expr, _serialize +if TYPE_CHECKING: + from greenplumpython.dataframe import DataFrame + class TypeCast(Expr): """ @@ -46,6 +49,37 @@ def _serialize(self) -> str: obj_str = _serialize(self._obj) return f"({obj_str}::{self._qualified_type_name})" + def bind( + self, + dataframe: Optional["DataFrame"] = None, + db: Optional[Database] = None, + column_name: str = None, + ) -> "Expr": + if isinstance(self._obj, Expr): + self._obj.bind( + dataframe=dataframe, + db=db, + ) + return self + + def apply( + self, expand: bool = False, column_name: Optional[str] = None, row_id: Optional[str] = None + ) -> "DataFrame": + from greenplumpython.dataframe import DataFrame + + if expand and column_name is None: + column_name = "func_" + uuid4().hex + return DataFrame( + f""" + SELECT {(row_id + ',') if row_id is not None else ''} + {self._serialize()} + {'AS ' + column_name if column_name is not None else ''} + {('FROM ' + self._obj._dataframe._name) if isinstance(self._obj, Expr) and self._obj._dataframe is not None else ""} + """, + db=self._db, + parents=[self._obj._dataframe], + ) + class Type: """ diff --git a/tests/test_type.py b/tests/test_type.py index 7f336dce..f9dd5827 100644 --- a/tests/test_type.py +++ b/tests/test_type.py @@ -21,6 +21,22 @@ def test_type_cast(db: gp.Database): assert row["complex"] == {"i": 2, "r": 1} +def test_type_cast_func_result(db: gp.Database): + float8 = gp.type_("float8") + rows = [(i, i) for i in range(10)] + df = db.create_dataframe(rows=rows, column_names=["a", "b"]) + + @gp.create_function + def func(a: int, b: int) -> int: + return a + b + + results = df.apply( + lambda t: float8(func(t["a"], t["b"])), + column_name="float8", + ) + assert sorted([row["float8"] for row in results]) == list(range(0, 20, 2)) + + def test_type_create(db: gp.Database): @dataclasses.dataclass class Person: From ebd3aad7360122d4a24238438c4758720883746c Mon Sep 17 00:00:00 2001 From: Ruxue Zeng <36695415+ruxuez@users.noreply.github.com> Date: Tue, 13 Jun 2023 11:10:27 +0200 Subject: [PATCH 21/28] Fix select slice with [:] (#196) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch fixes the following issue when selecting an entire dataframe with slice doesn't contain a LIMIT clause in the query. --------- Co-authored-by: Xuebin Su (苏学斌) <12034000+xuebinsu@users.noreply.github.com> --- greenplumpython/dataframe.py | 9 ++++++--- tests/test_dataframe.py | 6 ++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index 58d07a28..1dcf5509 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -26,6 +26,7 @@ `_ for syncing updates. """ import json +import sys from collections import abc from functools import partialmethod, singledispatchmethod from typing import ( @@ -117,12 +118,14 @@ def _(self, rows: slice) -> "DataFrame": raise NotImplementedError() offset_clause = "" if rows.start is None else f"OFFSET {rows.start}" limit_clause = ( - "" + sys.maxsize if rows.stop is None - else f"LIMIT {rows.stop if rows.start is None else rows.stop - rows.start}" + else rows.stop + if rows.start is None + else rows.stop - rows.start ) return DataFrame( - f"SELECT * FROM {self._name} {limit_clause} {offset_clause}", + f"SELECT * FROM {self._name} LIMIT {limit_clause} {offset_clause}", parents=[self], ) diff --git a/tests/test_dataframe.py b/tests/test_dataframe.py index 3c5d25c5..7794f65a 100644 --- a/tests/test_dataframe.py +++ b/tests/test_dataframe.py @@ -62,6 +62,12 @@ def test_dataframe_getitem_slice_off_limit(db: gp.Database, t: gp.DataFrame): assert len(list(t[2:5])) == 3 +def test_dataframe_getitem_slice_off_limit(db: gp.Database, t: gp.DataFrame): + query = t[:]._build_full_query() + assert len(list(t[:])) == 10 + assert "LIMIT" in query + + def test_dataframe_display_repr(db: gp.Database): # fmt: off rows = [(1, 1, "Lion",), (2, 2, "Tiger",), (3, 3, "Wolf",), (4, 4, "Fox")] From 207f498797259164f9828e7bf0d5261e8c409c10 Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Tue, 13 Jun 2023 17:32:21 +0200 Subject: [PATCH 22/28] Add distribution type --- greenplumpython/dataframe.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index 1dcf5509..79e8e90f 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -943,7 +943,8 @@ def save_as( temp: bool = False, storage_params: dict[str, Any] = {}, schema: Optional[str] = None, - distribution_key: Optional[List[str]] = None, + distribution_method: Literal[None, "randomly", "replicated", "keys"] = None, + distribution_keys: Optional[List[str]] = None, ) -> "DataFrame": """ Save the GreenplumPython :class:`~dataframe.Dataframe` as a *table* into the database. @@ -960,6 +961,8 @@ def save_as( storage_params: storage_parameter of gpdb, reference https://docs.vmware.com/en/VMware-Tanzu-Greenplum/7/greenplum-database/GUID-ref_guide-sql_commands-CREATE_TABLE_AS.html schema: schema of the table for avoiding name conflicts. + distribution_method: method of distribution by + distribution_keys: list of distribution keys Returns: DataFrame : :class:`~dataframe.DataFrame` represents the newly saved table @@ -1006,6 +1009,15 @@ def save_as( if table_name is None: table_name = self._name if not self.is_saved else "cte_" + uuid4().hex qualified_table_name = f'"{table_name}"' if schema is None else f'"{schema}"."{table_name}"' + distribution_clause = ( + f""" + DISTRIBUTED {f"BY ({','.join(distribution_keys)})" + if distribution_method == "keys" + else "REPLICATED"} + """ + if distribution_method is not None + else "" + ) self._db._execute( f""" CREATE {'TEMP' if temp else ''} TABLE {qualified_table_name} @@ -1014,7 +1026,7 @@ def save_as( AS ( {self._serialize()} ) - DISTRIBUTED {f"BY ({','.join(distribution_key)})" if distribution_key is not None else "RANDOMLY"} + {distribution_clause} """, has_results=False, ) From 302a449a9e4f4d13aa2e4222a92b895ff45bcaae Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Wed, 14 Jun 2023 14:52:58 +0200 Subject: [PATCH 23/28] Remove distribution key --- greenplumpython/dataframe.py | 14 -------------- tests/test_dataframe.py | 2 +- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/greenplumpython/dataframe.py b/greenplumpython/dataframe.py index 79e8e90f..27034255 100644 --- a/greenplumpython/dataframe.py +++ b/greenplumpython/dataframe.py @@ -943,8 +943,6 @@ def save_as( temp: bool = False, storage_params: dict[str, Any] = {}, schema: Optional[str] = None, - distribution_method: Literal[None, "randomly", "replicated", "keys"] = None, - distribution_keys: Optional[List[str]] = None, ) -> "DataFrame": """ Save the GreenplumPython :class:`~dataframe.Dataframe` as a *table* into the database. @@ -961,8 +959,6 @@ def save_as( storage_params: storage_parameter of gpdb, reference https://docs.vmware.com/en/VMware-Tanzu-Greenplum/7/greenplum-database/GUID-ref_guide-sql_commands-CREATE_TABLE_AS.html schema: schema of the table for avoiding name conflicts. - distribution_method: method of distribution by - distribution_keys: list of distribution keys Returns: DataFrame : :class:`~dataframe.DataFrame` represents the newly saved table @@ -1009,15 +1005,6 @@ def save_as( if table_name is None: table_name = self._name if not self.is_saved else "cte_" + uuid4().hex qualified_table_name = f'"{table_name}"' if schema is None else f'"{schema}"."{table_name}"' - distribution_clause = ( - f""" - DISTRIBUTED {f"BY ({','.join(distribution_keys)})" - if distribution_method == "keys" - else "REPLICATED"} - """ - if distribution_method is not None - else "" - ) self._db._execute( f""" CREATE {'TEMP' if temp else ''} TABLE {qualified_table_name} @@ -1026,7 +1013,6 @@ def save_as( AS ( {self._serialize()} ) - {distribution_clause} """, has_results=False, ) diff --git a/tests/test_dataframe.py b/tests/test_dataframe.py index 7794f65a..0f8ab91c 100644 --- a/tests/test_dataframe.py +++ b/tests/test_dataframe.py @@ -63,7 +63,7 @@ def test_dataframe_getitem_slice_off_limit(db: gp.Database, t: gp.DataFrame): def test_dataframe_getitem_slice_off_limit(db: gp.Database, t: gp.DataFrame): - query = t[:]._build_full_query() + query = t[:]._serialize() assert len(list(t[:])) == 10 assert "LIMIT" in query From 851d4359fa92c61830c2419a12b927a020af27f9 Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Mon, 19 Jun 2023 14:22:09 +0200 Subject: [PATCH 24/28] Add db backend pid print in execute --- greenplumpython/db.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/greenplumpython/db.py b/greenplumpython/db.py index f3cb42ea..2fc4dd78 100644 --- a/greenplumpython/db.py +++ b/greenplumpython/db.py @@ -60,6 +60,8 @@ def _execute(self, query: str, has_results: bool = True) -> Union[Iterable[Tuple """ with self._conn.cursor() as cursor: + cursor.execute("SELECT pg_backend_pid()") + print("BACKEND SESSION PID: ", cursor.fetchall()) if config.print_sql: print(query) cursor.execute(query) From d972bca8743284c5645d84cd5428d297ec0cac21 Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Mon, 19 Jun 2023 14:41:04 +0200 Subject: [PATCH 25/28] Add print db object id in execute --- greenplumpython/db.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/greenplumpython/db.py b/greenplumpython/db.py index 2fc4dd78..37b47a1d 100644 --- a/greenplumpython/db.py +++ b/greenplumpython/db.py @@ -62,6 +62,7 @@ def _execute(self, query: str, has_results: bool = True) -> Union[Iterable[Tuple with self._conn.cursor() as cursor: cursor.execute("SELECT pg_backend_pid()") print("BACKEND SESSION PID: ", cursor.fetchall()) + print("OBJECT ID: ", id(self)) if config.print_sql: print(query) cursor.execute(query) @@ -69,6 +70,7 @@ def _execute(self, query: str, has_results: bool = True) -> Union[Iterable[Tuple def close(self) -> None: """Close the database connection.""" + print("OBJECT ID CLOSED: ", id(self)) self._conn.close() def create_dataframe( From afd50df3a8f744982fd45ab701eccc658b3cc877 Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Wed, 26 Jul 2023 14:29:20 +0200 Subject: [PATCH 26/28] @xuebinsu Fix bug on existing data types --- greenplumpython/type.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/greenplumpython/type.py b/greenplumpython/type.py index dff11241..64978ef3 100644 --- a/greenplumpython/type.py +++ b/greenplumpython/type.py @@ -225,7 +225,8 @@ def to_pg_type( args: Tuple[type, ...] = annotation.__args__ if for_return: return f"SETOF {to_pg_type(args[0], db)}" # type: ignore - if args[0] in _defined_types: + # if args[0] in _defined_types: + else: return f"{to_pg_type(args[0], db)}[]" # type: ignore raise NotImplementedError() else: From 909e80b643edcfa078bcde5516da11f6cda4aab7 Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Wed, 26 Jul 2023 14:35:51 +0200 Subject: [PATCH 27/28] @xuebinsu Fix bug on existing data types --- greenplumpython/expr.py | 5 +++-- greenplumpython/type.py | 5 +++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/greenplumpython/expr.py b/greenplumpython/expr.py index 44f5d8aa..c0d53090 100644 --- a/greenplumpython/expr.py +++ b/greenplumpython/expr.py @@ -634,8 +634,9 @@ def _serialize(self) -> str: self._left._db = self._db if isinstance(self._right, Expr): self._right._db = self._db - left_str = _serialize(self._left) - right_str = _serialize(self._right) + # FIXME: Move _serialize() to be a method of Database. + left_str = _serialize(self._left._bind(db=self._db)) + right_str = _serialize(self._right._bind(db=self._db)) return f"({left_str} {self._operator} {right_str})" diff --git a/greenplumpython/type.py b/greenplumpython/type.py index 64978ef3..268438d8 100644 --- a/greenplumpython/type.py +++ b/greenplumpython/type.py @@ -135,6 +135,9 @@ def _create_in_db(self, db: Database): """ if self._created_in_dbs is None or db in self._created_in_dbs: return + assert isinstance( + self._annotation, type + ), "Only composite data types can be created in database." schema = "pg_temp" members = get_type_hints(self._annotation) if len(members) == 0: @@ -230,6 +233,8 @@ def to_pg_type( return f"{to_pg_type(args[0], db)}[]" # type: ignore raise NotImplementedError() else: + if isinstance(annotation, Type): + return annotation._qualified_name_str assert db is not None, "Database is required to create type" if annotation not in _defined_types: type_name = "type_" + uuid4().hex From c58e419520d892f7c9264c7dab1184514d5120a5 Mon Sep 17 00:00:00 2001 From: Ruxue Zeng Date: Wed, 26 Jul 2023 14:40:06 +0200 Subject: [PATCH 28/28] Revert last 2 commits --- greenplumpython/expr.py | 5 ++--- greenplumpython/type.py | 8 +------- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/greenplumpython/expr.py b/greenplumpython/expr.py index c0d53090..44f5d8aa 100644 --- a/greenplumpython/expr.py +++ b/greenplumpython/expr.py @@ -634,9 +634,8 @@ def _serialize(self) -> str: self._left._db = self._db if isinstance(self._right, Expr): self._right._db = self._db - # FIXME: Move _serialize() to be a method of Database. - left_str = _serialize(self._left._bind(db=self._db)) - right_str = _serialize(self._right._bind(db=self._db)) + left_str = _serialize(self._left) + right_str = _serialize(self._right) return f"({left_str} {self._operator} {right_str})" diff --git a/greenplumpython/type.py b/greenplumpython/type.py index 268438d8..dff11241 100644 --- a/greenplumpython/type.py +++ b/greenplumpython/type.py @@ -135,9 +135,6 @@ def _create_in_db(self, db: Database): """ if self._created_in_dbs is None or db in self._created_in_dbs: return - assert isinstance( - self._annotation, type - ), "Only composite data types can be created in database." schema = "pg_temp" members = get_type_hints(self._annotation) if len(members) == 0: @@ -228,13 +225,10 @@ def to_pg_type( args: Tuple[type, ...] = annotation.__args__ if for_return: return f"SETOF {to_pg_type(args[0], db)}" # type: ignore - # if args[0] in _defined_types: - else: + if args[0] in _defined_types: return f"{to_pg_type(args[0], db)}[]" # type: ignore raise NotImplementedError() else: - if isinstance(annotation, Type): - return annotation._qualified_name_str assert db is not None, "Database is required to create type" if annotation not in _defined_types: type_name = "type_" + uuid4().hex