Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 89 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,92 @@
# dbt-fabricspark Changelog

- This file provides a full account of all changes to `dbt-fabricspark`.
## Previous Releases
## 1.9.1rc2 (2024-12-31)

### exprimental feature

- Enable Python model, fixed [Issue #32](https://github.com/microsoft/dbt-fabricspark/issues/32), [get contact](mailto:willem.liang@icloud.com)
<details><summary>Usage</summary>

Create your pyspark model in dbt project, and implementation python function `model(dbt, session) -> DataFrame` in your model's code.
- [Release Note](https://www.getdbt.com/blog/introducing-support-for-python)
- [Document](https://docs.getdbt.com/docs/build/python-models)

Example:
```python
# A udf to calculate distance between two coordinates
from coordTransform_py import coordTransform_utils # see also: https://github.com/wandergis/coordTransform_py
from geopy.distance import geodesic
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

@udf(DoubleType())
def udf_geo_distance(lng1,lat1,lng2,lat2,vendor1,vendor2):
wgs84_converters = {
'baidu': coordTransform_utils.bd09_to_wgs84,
'amap': coordTransform_utils.gcj02_to_wgs84,
'tencent': lambda lng, lat: (lng, lat),
'google': lambda lng, lat: (lng, lat)
}

convert1 = wgs84_converters.get(vendor1)
convert2 = wgs84_converters.get(vendor2)
# convert into WGS84
coord1 = tuple(reversed(convert1(lng1, lat1)))
coord2 = tuple(reversed(convert2(lng2, lat2)))
# calculate distance
distance = geodesic(coord1, coord2).meters
return distance

def model(dbt, session) -> DataFrame:
records = [
{
'coord1_vendor':'amap',
'coord1_addr':'Zhangjiang High-Tech Park',
'coord1_lng':121.587691,
'coord1_lat':31.201839,
'coord2_vendor':'baidu',
'coord2_addr':'JinKe Rd.',
'coord2_lng':121.608551,
'coord2_lat':31.210002
}
]
souece_df = session.createDataFrame(records)

# Data processing BY RDD API or UDFs
final_df = souece_df.withColumn("distance",
udf_geo_distance(
souece_df["coord1_lng"],souece_df["coord1_lat"],
souece_df["coord2_lng"],souece_df["coord2_lat"],
souece_df["coord1_vendor"],souece_df["coord2_vendor"])
)
return final_df
```
</details>

## 1.9.1rc1 (2024-12-25)

### upgrade dbt-core

- Upgrade dbt-core to v1.9.1, keep pace with dbt-spark v1.9.0 & dbt-fabric v1.9.0(both are the latest version), along with logging timezone support
- Version string follows the dbt-core version("1.9.1")


## 1.7.0rc3 (2024-12-18)

### bug fix

- Support Lakehouse schema
- Quick fix dbt-core issue #6185 #9573 (dbt-core v1.8.9)

### new feature

- New custom macro `read_lakehouse_file` which enables querying lakehouse file in a data model
- Add dbt-spark into the requirement list


## 1.7.0rc2 (2024-12-04)

### patch

- upgraded the legacy APIs with dbt v1.8.9 based on dbt-fabricspark v1.7.0rc1. [get contact](mailto:willem.liang@icloud.com)
299 changes: 236 additions & 63 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dbt/adapters/fabricspark/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.7.0rc1"
version = "1.9.1rc2"
2 changes: 1 addition & 1 deletion dbt/adapters/fabricspark/column.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional, TypeVar, Union
from dbt.adapters.base.column import Column
from dbt.dataclass_schema import dbtClassMixin
from dbt_common.dataclass_schema import dbtClassMixin

Self = TypeVar("Self", bound="SparkColumn")

Expand Down
26 changes: 14 additions & 12 deletions dbt/adapters/fabricspark/connections.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from contextlib import contextmanager
import os
import dbt.exceptions
import dbt.adapters.exceptions

from dbt.adapters.sql import SQLConnectionManager
from dbt.contracts.connection import ConnectionState, AdapterResponse
from dbt.events import AdapterLogger
from dbt.events.functions import fire_event
from dbt.events.types import ConnectionUsed, SQLQuery, SQLQueryStatus
from dbt.adapters.contracts.connection import ConnectionState, AdapterResponse
from dbt.adapters.events.logging import AdapterLogger
from dbt_common.events.functions import fire_event
from dbt.adapters.events.types import ConnectionUsed, SQLQuery, SQLQueryStatus
from dbt.utils import DECIMALS
from dbt.adapters.fabricspark.livysession import LivySessionConnectionWrapper, LivySessionManager

from dbt.contracts.connection import Connection
from dbt.dataclass_schema import StrEnum
from dbt.adapters.contracts.connection import Connection
from dbt_common.dataclass_schema import StrEnum
from typing import Any, Optional, Union, Tuple, List, Generator, Iterable, Sequence
from abc import ABC, abstractmethod
import time
Expand Down Expand Up @@ -163,7 +163,7 @@ def open(cls, connection: Connection) -> Connection:
msg = "Failed to connect"
if creds.token is not None:
msg += ", is your token valid?"
raise dbt.exceptions.FailedToConnectError(msg) from e
raise dbt.adapters.exceptions.FailedToConnectError(msg) from e
retryable_message = _is_retryable_error(e)
if retryable_message and creds.connect_retries > 0:
msg = (
Expand All @@ -184,12 +184,12 @@ def open(cls, connection: Connection) -> Connection:
logger.warning(msg)
time.sleep(creds.connect_timeout)
else:
raise dbt.exceptions.FailedToConnectError("failed to connect") from e
raise dbt.adapters.exceptions.FailedToConnectError("failed to connect") from e
else:
raise exc # type: ignore

if handle is None:
raise dbt.exceptions.FailedToConnectError("Failed to connect to Livy session. Common reasons for errors: \n1. Invalid/expired credentials (if using CLI authentication, re-run `az login` in your terminal) \n2. Invalid endpoint \n3. Invalid workspaceid or lakehouseid (do you have the correct permissions?) \n4. Invalid or non-existent shortcuts json path, or improperly formatted shortcuts")
raise dbt.adapters.exceptions.FailedToConnectError("Failed to connect to Livy")
connection.handle = handle
connection.state = ConnectionState.OPEN
return connection
Expand Down Expand Up @@ -240,7 +240,7 @@ def fetch_spark_version(cls, connection) -> None:
try:
sql = "split(version(), ' ')[0] as version"
cursor = connection.handle.cursor()
cursor.execute(sql)
cursor.execute(sql, 'sql') # add language parameter
res = cursor.fetchall()
SparkConnectionManager.spark_version = res[0][0]

Expand All @@ -252,9 +252,11 @@ def fetch_spark_version(cls, connection) -> None:
os.environ["DBT_SPARK_VERSION"] = SparkConnectionManager.spark_version
logger.debug(f"SPARK VERSION {os.getenv('DBT_SPARK_VERSION')}")

# Add language parameter.
def add_query(
self,
sql: str,
language: str = 'sql',
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
Expand All @@ -276,7 +278,7 @@ def add_query(
cursor = connection.handle.cursor()

try:
cursor.execute(sql, bindings)
cursor.execute(sql, language, bindings)
except Exception as ex:
query_exception = ex

Expand Down
5 changes: 3 additions & 2 deletions dbt/adapters/fabricspark/fabric_spark_credentials.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dbt.adapters.base import Credentials
from dbt.adapters.contracts.connection import Credentials
from typing import Any, Dict, Optional, Tuple
from dataclasses import dataclass, field
import dbt.exceptions
Expand Down Expand Up @@ -56,7 +56,8 @@ def __post_init__(self) -> None:
# f"On Spark, lakehouse must be omitted or have the same value as"
# # f" schema."
# # )
self.schema = self.lakehouse
self.schema = f"{self.lakehouse}.{self.schema}" # Support Fabric Lakehouse schema
# self.schema = self.lakehouse

@property
def type(self) -> str:
Expand Down
41 changes: 33 additions & 8 deletions dbt/adapters/fabricspark/impl.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
import re
from concurrent.futures import Future
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Union, Tuple, Callable, Set
from typing import Any, Dict, Iterable, List, Optional, Union, Tuple, Callable, Set, Type
from dbt.adapters.base.relation import InformationSchema
from dbt.contracts.graph.manifest import Manifest
from typing_extensions import TypeAlias
import agate
import dbt
import dbt.exceptions
from dbt.adapters.base import AdapterConfig
from dbt.adapters.base import AdapterConfig, PythonJobHelper
from dbt.adapters.base.impl import catch_as_completed, ConstraintSupport
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.fabricspark import SparkConnectionManager
from dbt.adapters.fabricspark import SparkRelation
from dbt.adapters.fabricspark import SparkColumn
from dbt.adapters.base import BaseRelation
from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER
from dbt_common.clients.agate_helper import DEFAULT_TYPE_TESTER
from dbt.contracts.graph.nodes import ConstraintType
from dbt.contracts.relation import RelationType
from dbt.events import AdapterLogger
from dbt.utils import executor, AttrDict
from dbt.adapters.contracts.relation import RelationType
from dbt.adapters.events.logging import AdapterLogger
from dbt_common.utils import executor, AttrDict

from dbt.adapters.fabricspark.python_submissions import BaseFabricSparkHelper
from dbt.adapters.contracts.connection import AdapterResponse

logger = AdapterLogger("fabricspark")

Expand Down Expand Up @@ -439,7 +442,7 @@ def get_rows_different_sql(
def run_sql_for_tests(self, sql, fetch, conn):
cursor = conn.handle.cursor()
try:
cursor.execute(sql)
cursor.execute(sql, "sql") # Add language parameter.
if fetch == "one":
if hasattr(cursor, "fetchone"):
return cursor.fetchone()
Expand All @@ -456,6 +459,28 @@ def run_sql_for_tests(self, sql, fetch, conn):
finally:
conn.transaction_open = False

# Add Python Model Support
@property
def default_python_submission_method(self) -> str:
return "livy_session_statement"

# Add Python Model Support
@property
def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
# TODO: Figure out submission_method enumerates to enable Livy Batch Jobs.
# The current submission method returns 'workflow_job', but in reality, what we need is a Livy session statement job.
# The keyword 'workflow_job' cannot be found in dbt-spark, dbt-fabricsspark, or dbt-core.
# I assume the two types of jobs are defined as 'livy_session_batch' and 'livy_session_statement'.
return {
"workflow_job": BaseFabricSparkHelper,
"livy_session_statement": BaseFabricSparkHelper,
"livy_session_batch": BaseFabricSparkHelper,
}

# Add Python Model Support
def generate_python_submission_response(self, submission_result: Any) -> AdapterResponse:
return self.connections.get_response(None)

def standardize_grants_dict(self, grants_table: agate.Table) -> dict:
grants_dict: Dict[str, List[str]] = {}
for row in grants_table:
Expand All @@ -475,7 +500,7 @@ def standardize_grants_dict(self, grants_table: agate.Table) -> dict:

def debug_query(self) -> None:
"""Override for DebugTask method"""
self.execute("select 1 as id")
self.execute("select 1 as id", "sql") # Add language parameter.


# spark does something interesting with joins when both tables have the same
Expand Down
Loading