diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0792418..0f5b89d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,92 @@
# dbt-fabricspark Changelog
- This file provides a full account of all changes to `dbt-fabricspark`.
-## Previous Releases
+## 1.9.1rc2 (2024-12-31)
+
+### exprimental feature
+
+- Enable Python model, fixed [Issue #32](https://github.com/microsoft/dbt-fabricspark/issues/32), [get contact](mailto:willem.liang@icloud.com)
+ Usage
+
+ Create your pyspark model in dbt project, and implementation python function `model(dbt, session) -> DataFrame` in your model's code.
+ - [Release Note](https://www.getdbt.com/blog/introducing-support-for-python)
+ - [Document](https://docs.getdbt.com/docs/build/python-models)
+
+ Example:
+ ```python
+ # A udf to calculate distance between two coordinates
+ from coordTransform_py import coordTransform_utils # see also: https://github.com/wandergis/coordTransform_py
+ from geopy.distance import geodesic
+ from pyspark.sql.functions import udf
+ from pyspark.sql.types import DoubleType
+
+ @udf(DoubleType())
+ def udf_geo_distance(lng1,lat1,lng2,lat2,vendor1,vendor2):
+ wgs84_converters = {
+ 'baidu': coordTransform_utils.bd09_to_wgs84,
+ 'amap': coordTransform_utils.gcj02_to_wgs84,
+ 'tencent': lambda lng, lat: (lng, lat),
+ 'google': lambda lng, lat: (lng, lat)
+ }
+
+ convert1 = wgs84_converters.get(vendor1)
+ convert2 = wgs84_converters.get(vendor2)
+ # convert into WGS84
+ coord1 = tuple(reversed(convert1(lng1, lat1)))
+ coord2 = tuple(reversed(convert2(lng2, lat2)))
+ # calculate distance
+ distance = geodesic(coord1, coord2).meters
+ return distance
+
+ def model(dbt, session) -> DataFrame:
+ records = [
+ {
+ 'coord1_vendor':'amap',
+ 'coord1_addr':'Zhangjiang High-Tech Park',
+ 'coord1_lng':121.587691,
+ 'coord1_lat':31.201839,
+ 'coord2_vendor':'baidu',
+ 'coord2_addr':'JinKe Rd.',
+ 'coord2_lng':121.608551,
+ 'coord2_lat':31.210002
+ }
+ ]
+ souece_df = session.createDataFrame(records)
+
+ # Data processing BY RDD API or UDFs
+ final_df = souece_df.withColumn("distance",
+ udf_geo_distance(
+ souece_df["coord1_lng"],souece_df["coord1_lat"],
+ souece_df["coord2_lng"],souece_df["coord2_lat"],
+ souece_df["coord1_vendor"],souece_df["coord2_vendor"])
+ )
+ return final_df
+ ```
+
+
+## 1.9.1rc1 (2024-12-25)
+
+### upgrade dbt-core
+
+- Upgrade dbt-core to v1.9.1, keep pace with dbt-spark v1.9.0 & dbt-fabric v1.9.0(both are the latest version), along with logging timezone support
+- Version string follows the dbt-core version("1.9.1")
+
+
+## 1.7.0rc3 (2024-12-18)
+
+### bug fix
+
+- Support Lakehouse schema
+- Quick fix dbt-core issue #6185 #9573 (dbt-core v1.8.9)
+
+### new feature
+
+- New custom macro `read_lakehouse_file` which enables querying lakehouse file in a data model
+- Add dbt-spark into the requirement list
+
+
+## 1.7.0rc2 (2024-12-04)
+
+### patch
+
+- upgraded the legacy APIs with dbt v1.8.9 based on dbt-fabricspark v1.7.0rc1. [get contact](mailto:willem.liang@icloud.com)
\ No newline at end of file
diff --git a/README.md b/README.md
index 7bd7355..40129f0 100644
--- a/README.md
+++ b/README.md
@@ -1,63 +1,236 @@
-
-
-
-
-
-[dbt](https://www.getdbt.com/) enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.
-
-dbt is the T in ELT. Organize, cleanse, denormalize, filter, rename, and pre-aggregate the raw data in your warehouse so that it's ready for analysis.
-
-## dbt-fabricspark
-
-The `dbt-fabricspark` package contains all of the code enabling dbt to work with Synapse Spark in Microsoft Fabric. For more information, consult [the docs](https://docs.getdbt.com/docs/profile-fabricspark).
-
-## Getting started
-
-- [Install dbt](https://docs.getdbt.com/docs/installation)
-- Read the [introduction](https://docs.getdbt.com/docs/introduction/) and [viewpoint](https://docs.getdbt.com/docs/about/viewpoint/)
-
-## Running locally
-Use livy endpoint to connect to Synapse Spark in Microsoft Fabric. The binaries required to setup local environment is not possiblw with Synapse Spark in Microsoft Fabric. However, you can configure profile to connect via livy endpoints.
-
-Create a profile like this one:
-
-```yaml
-fabric-spark-test:
- target: fabricspark-dev
- outputs:
- fabricspark-dev:
- authentication: CLI
- method: livy
- connect_retries: 0
- connect_timeout: 10
- endpoint: https://api.fabric.microsoft.com/v1
- workspaceid: bab084ca-748d-438e-94ad-405428bd5694
- lakehouseid: ccb45a7d-60fc-447b-b1d3-713e05f55e9a
- lakehouse: test
- schema: test
- threads: 1
- type: fabricspark
- retry_all: true
-```
-
-### Reporting bugs and contributing code
-
-- Want to report a bug or request a feature? Let us know on [Slack](http://slack.getdbt.com/), or open [an issue](https://github.com/microsoft/dbt-fabricspark/issues/new).
-
-## Code of Conduct
-
-Everyone interacting in the Microsoft project's codebases, issue trackers, and mailing lists is expected to follow the [PyPA Code of Conduct](https://www.pypa.io/en/latest/code-of-conduct/).
-
-## Join the dbt Community
-
-- Be part of the conversation in the [dbt Community Slack](http://community.getdbt.com/)
-- Read more on the [dbt Community Discourse](https://discourse.getdbt.com)
-
-## Reporting bugs and contributing code
-
-- Want to report a bug or request a feature? Let us know on [Slack](http://community.getdbt.com/), or open [an issue](https://github.com/microsoft/dbt-fabricspark/issues/new)
-- Want to help us build dbt? Check out the [Contributing Guide](https://github.com/microsoft/dbt-fabricspark/blob/HEAD/CONTRIBUTING.md)
-
-## Code of Conduct
-
-Everyone interacting in the dbt project's codebases, issue trackers, chat rooms, and mailing lists is expected to follow the [dbt Code of Conduct](https://community.getdbt.com/code-of-conduct).
+# dbt-fabricspark
+
+dbt-FabricSpark is an adapter that connects dbt to Microsoft Fabric Lakehouse, featuring support for PySpark to enable efficient data processing and transformation.
+
+## Enhancements
+- **PySpark Support**: Run PySpark code within dbt to perform complex data transformations.
+- **Enhanced Functionality**: Bug fixes and optimizations for better compatibility with Microsoft Fabric Lakehouse.
+- **Seamless Integration**: Fully compatible with dbt’s existing workflow. (i.e. use the same dbt-core package to manage your warehouse and lakehouse.)
+
+[](https://www.bilibili.com/video/BV1jcQnYsEh3)
+*Click to play the video*
+
+## Quick Start
+
+This quick demo shows how to convert parent-child pairs into hierarchical path strings in Microsoft Fabric Lakehouse using a dbt project. In practical scenarios, it displays the hierarchical relationship of financial account structures (also known as COA, Chart of Accounts), such as `EBITDA > NOI > Revenue > Sales Income`. To simplify data representation, administrative data from Shanghai is used.
+
+**Input Data Example**
+
+| code | name |parent|
+|---------|----------|------|
+| 31| 上海市| null|
+| 3101| 市辖区| 31|
+| 310101| 黄浦区| 3101|
+|310101002|南京东路街道|310101|
+|310101013| 外滩街道|310101|
+| 310104| 徐汇区| 3101|
+|310104003| 天平路街道|310104|
+|310104004| 湖南路街道|310104|
+
+
+**Output Data Example**
+
+| code | name |parent| hierarchy_path |
+|---------|----------|------|----------------------------------|
+| 31| 上海市| null|上海市 |
+| 3101| 市辖区| 31|上海市 > 市辖区 |
+| 310101| 黄浦区| 3101|上海市 > 市辖区 > 黄浦区 |
+|310101002|南京东路街道|310101|上海市 > 市辖区 > 黄浦区 > 南京东路街道|
+|310101013| 外滩街道|310101|上海市 > 市辖区 > 黄浦区 > 外滩街道 |
+| 310104| 徐汇区| 3101|上海市 > 市辖区 > 徐汇区 |
+|310104003| 天平路街道|310104|上海市 > 市辖区 > 徐汇区 > 天平路街道 |
+|310104004| 湖南路街道|310104|上海市 > 市辖区 > 徐汇区 > 湖南路街道 |
+
+**Steps**
+
+1. Build & Install.
+
+ ```bash
+ # Build the dbt-fabricspark package.
+ cd script
+ make build
+ latest_built=$(ls -t dist/dbt_fabricspark_custom-*.whl | head -1)
+
+ # Install the dbt-fabricspark package into your Python virtual environment.
+ cd /path/to/your/working/directory
+ workdir=$(pwd)
+ # Create virtual environment
+ python -m venv --prompt dbt-fabricspark .dbt-fabricspark
+ echo "export PS1=\"(dbt-fabricspark) \h:\W \u\$ \"" >> .dbt-fabricspark/bin/activate
+ echo "export DBT_PROFILES_DIR=\"${workdir}/projects\"" >> .dbt-fabricspark/bin/activate
+ source .dbt-fabricspark/bin/activate
+
+ # Upgrade pip & Install dbt-fabricspark
+ python -m pip install --upgrade pip
+ python -m pip install ${latest_built}
+ ```
+
+ Preparing your project structure as below:
+ ```bash
+ .
+ ├── .dbt-fabricspark # Python virtual environment
+ └── projects # dbt project
+ ├── playground # demo project for Fabric Lakehouse
+ │ ├── dbt_project.yml # project config
+ │ ├── models
+ │ └── profile_template.yml
+ ├── jaffle_shop # demo project for Fabric Warehouse. Remove it if you won't to use it.
+ │ ├── LICENSE
+ │ ├── README.md
+ │ ├── dbt_project.yml # project config
+ │ ├── etc
+ │ ├── logs
+ │ ├── models
+ │ ├── seeds
+ │ └── target
+ └── profiles.yml # dbt profiles
+ ```
+
+
+2. Configure: Update your `profiles.yml` with Microsoft Fabric Lakehouse credentials.
+
+ ```yaml
+ flags:
+ partial_parse: true
+ # dbt-fabricspark project, Fabric Lakehouse Spark Endpoint(via Livy)
+ playground:
+ target: fabricspark-dev
+ outputs:
+ fabricspark-dev:
+ authentication: CLI
+ method: livy
+ connect_retries: 0
+ connect_timeout: 1800000
+ endpoint: https://api.fabric.microsoft.com/v1
+ workspaceid: the-workspaceid-within-livy-endpoint # replace with yours
+ lakehouseid: the-lakehouseid-within-livy-endpoint # replace with yours
+ lakehouse: dl_playground
+ schema: dbo
+ threads: 1
+ type: fabricspark
+ retry_all: true
+ ```
+
+3. Create Models: Define PySpark models in .py files within the `project/playground/models/` directory.
+
+ test_pyspark_model.py
+
+ ```python
+ import pandas as pd
+ from pyspark.sql import DataFrame
+ from pyspark.sql.types import ArrayType, StringType
+ from io import StringIO
+
+ def generate_data(sparkSession)->DataFrame:
+ tree = '31:上海市#$01:市辖区#%01:黄浦区#|002:南京东路街道#|013:外滩街道#|015:半淞园路街道#|017:小东门街道#|018:豫园街道#|019:老西门街道#|020:五里桥街道#|021:打浦桥街道#|022:淮海中路街道#|023:瑞金二路街道#%04:徐汇区#|003:天平路街道#|004:湖南路街道#|007:斜土路街道#|008:枫林路街道#|010:长桥街道#|011:田林街道#|012:虹梅路街道 #|013:康健新村街道#|014:徐家汇街道#|015:凌云路街道#|016:龙华街道#|017:漕河泾街道#|103:华泾镇#|501:漕河泾新兴技术开发区#%05:长宁区#|001:华阳路街道#|002:江苏路街道#|004:新华路街道#|005:周家桥街道#|006:天山路街道#|008:仙霞新村街道#|009:虹桥街道#|010:程家桥街道#|011:北新泾街道#|102:新泾镇#%06:静安区#|006:江宁路街道#|011:石门 二路街道#|012:南京西路街道#|013:静安寺街道#|014:曹家渡街道#|015:天目西路街道#|016:北站街道#|017:宝山路街道#|018:共和新路街道#|019:大宁路街道#|020:彭浦新村街道#|021:临汾路街道#|022:芷江西路街道#|101:彭浦镇#%07:普陀区#|005:曹杨新村街道#|014:长风新村街道#|015:长寿路街道#|016:甘泉路街道#|017:石泉路街道#|020:宜川路街道#|021:万 里街道#|022:真如镇街道#|102:长征镇#|103:桃浦镇#%09:虹口区#|009:欧阳路街道#|010:曲阳路街道#|011:广中路街道#|014:嘉兴路街道#|016:凉城新村街道#|017:四川北路街道#|018:北外滩街道#|019:江湾镇街道#%10:杨浦区#|001:定海路街道#|006:平凉路街道#|008:江浦路街道#|009:四平路街道#|012:控江路街道#|013:长白新村街道#|015:延吉新村街道#|016: 殷行街道#|018:大桥街道#|019:五角场街道#|020:新江湾城街道#|021:长海路街道#%12:闵行区#|001:江川路街道#|006:古美街道#|008:新虹街道#|009:浦锦街道#|101:莘庄镇#|102:七宝镇#|103:颛桥镇#|106:华漕镇#|107:虹桥镇#|108:梅陇镇#|110:吴泾镇#|112:马桥镇#|114:浦江镇#|501:莘庄工业区#%13:宝山区#|003:友谊路街道#|007:吴淞街道#|008:张庙街道 #|101:罗店镇#|102:大场镇#|103:杨行镇#|104:月浦镇#|106:罗泾镇#|109:顾村镇#|111:高境镇#|112:庙行镇#|113:淞南镇#|501:宝山工业园区#%14:嘉定区#|001:新成路街道#|002:真新街道#|004:嘉定镇街道#|102:南翔镇#|103:安亭镇#|106:马陆镇#|109:徐行镇#|111:华亭镇#|114:外冈镇#|118:江桥镇#|401:菊园新区#|501:嘉定工业区#%15:浦东新区#|004: 潍坊新村街道#|005:陆家嘴街道#|007:周家渡街道#|008:塘桥街道#|009:上钢新村街道#|010:南码头路街道#|011:沪东新村街道#|012:金杨新村街道#|013:洋泾街道#|014:浦兴路街道#|015:东明路街道#|016:花木街道#|103:川沙新镇#|104:高桥镇#|105:北蔡镇#|110:合庆镇#|114:唐镇#|117:曹路镇#|120:金桥镇#|121:高行镇#|123:高东镇#|125:张江镇#|130:三林 镇#|131:惠南镇#|132:周浦镇#|133:新场镇#|134:大团镇#|136:康桥镇#|137:航头镇#|139:祝桥镇#|140:泥城镇#|141:宣桥镇#|142:书院镇#|143:万祥镇#|144:老港镇#|145:南汇新城镇#|401:芦潮港农场#|402:东海农场#|403:朝阳农场#|501:中国(上海)自由贸易试验区(保税片区)#|502:金桥经济技术开发区#|503:张江高科技园区#%16:金山区#|001:石化街道#|1 01:朱泾镇#|102:枫泾镇#|103:张堰镇#|104:亭林镇#|105:吕巷镇#|107:廊下镇#|109:金山卫镇#|112:漕泾镇#|113:山阳镇#|503:上海湾区高新技术产业开发区#%17:松江区#|001:岳阳街道#|002:永丰街道#|003:方松街道#|004:中山街道#|005:广富林街道#|006:九里亭街道#|102:泗泾镇#|103:佘山镇#|104:车墩镇#|105:新桥镇#|106:洞泾镇#|107:九亭镇#|109:泖 港镇#|116:石湖荡镇#|117:新浜镇#|120:叶榭镇#|121:小昆山镇#|501:松江工业区#|504:佘山度假区#|507:上海松江出口加工区#%18:青浦区#|001:夏阳街道#|002:盈浦街道#|003:香花桥街道#|102:朱家角镇#|103:练塘镇#|104:金泽镇#|105:赵巷镇#|106:徐泾镇#|107:华新镇#|109:重固镇#|110:白鹤镇#%20:奉贤区#|001:西渡街道#|002:奉浦街道#|003:金海街道#| 101:南桥镇#|102:奉城镇#|104:庄行镇#|106:金汇镇#|109:四团镇#|111:青村镇#|118:柘林镇#|123:海湾镇#|503:海湾旅游区#%51:崇明区#|101:城桥镇#|102:堡镇#|103:新河镇#|104:庙镇#|105:竖新镇#|106:向化镇#|107:三星镇#|108:港沿镇#|109:中兴镇#|110:陈家镇#|111:绿华镇#|112:港西镇#|113:建设镇#|114:新海镇#|115:东平镇#|116:长兴镇#|201: 新村乡#|202:横沙乡#|401:前卫农场#|402:东平林场#|501:上实现代农业园区'
+ data=['code,name,parent']
+ p=['']*4
+ for node in tree.split('#'):
+ i = '$%|'.find(node[0])+1
+ [k,v] = node.strip('$%|').split(':')
+ p[i]=k
+ data.append(''.join(p[:i+1])+f",{v},"+''.join(p[:i]))
+
+ csv_content = '\n'.join(data)
+ pandas_df = pd.read_csv(StringIO(csv_content),dtype=str)
+ spark_df = sparkSession.createDataFrame(pandas_df)
+ return spark_df
+
+ def find_hierarchy_path(linked_list:ArrayType, target_node:StringType)->ArrayType(StringType()):
+ result = []
+ visited = set()
+ next_node = target_node
+
+ map_dict = {}
+ for item in linked_list:
+ map_dict.update(item)
+
+ while next_node in map_dict and next_node not in visited:
+ result.append(next_node)
+ visited.add(next_node)
+ next_node = map_dict.get(next_node)
+
+ return result
+
+ def reverse_array(input:ArrayType)->ArrayType(StringType()):
+ return input[::-1]
+
+ def model(dbt, session) -> DataFrame:
+ spark_df = generate_data(session)
+ spark_df.show()
+
+ stg_table="stg_prop_city_dist"
+ lakehouseName="dl_playground"
+ spark_df.createOrReplaceTempView(stg_table)
+
+ session.udf.register("find_hierarchy_path", find_hierarchy_path, ArrayType(StringType()))
+ session.udf.register("reverse_array", reverse_array, ArrayType(StringType()))
+
+ sql_stmt = f"""
+ select
+ code
+ ,name
+ ,parent
+ ,regexp_replace(concat_ws(' > ', reverse_array(find_hierarchy_path(nodes, code_name))),'[0-9]*::', '') as hierarchy_path
+ from (
+ select
+ cur.code
+ ,cur.name
+ ,concat(cur.code, '::', cur.name) as code_name
+ ,cur.parent
+ ,collect_list(
+ map(
+ concat(cur.code, '::', cur.name)
+ ,case when parent.code is null or cur.parent is null or lower(trim(cur.parent)) = 'null'
+ then concat(cur.code, '::', cur.name)
+ else concat(parent.code, '::', parent.name)
+ end)
+ ) OVER () as nodes
+ from {stg_table} as cur
+ left join {stg_table} as parent
+ on cur.parent = parent.code
+ ) tbl
+ """
+ final_df = session.sql(sql_stmt)
+ return final_df
+ ```
+
+
+4. Test the PySpark model:
+ ```bash
+ cd playground
+ dbt run
+ ```
+
+
+ ```bash
+ 13:28:24 Running with dbt=1.9.1
+ 13:28:24 Registered adapter: fabricspark=1.9.1-rc2
+ 13:28:25 Found 18 models, 591 macros
+ 13:28:25
+ 13:28:25 Concurrency: 1 threads (target='fabricspark-dev')
+ 13:28:25
+ Creating Livy session (this may take a few minutes)
+ Livy session created successfully
+ 13:32:04 1 of 1 START python table model dl_playground.dbo.test_pyspark_model ........... [RUN]
+ ['31', '上海市', '上海市']
+ ['3101', '市辖区', '31', '上海市 > 市辖区']
+ ['310101', '黄浦区', '3101', '上海市 > 市辖区 > 黄浦区']
+ ['310101002', '南京东路街道', '310101', '上海市 > 市辖区 > 黄浦区 > 南京东路街道']
+ ['310101013', '外滩街道', '310101', '上海市 > 市辖区 > 黄浦区 > 外滩街道']
+ ['310101015', '半淞园路街道', '310101', '上海市 > 市辖区 > 黄浦区 > 半淞园路街道']
+ ['310101017', '小东门街道', '310101', '上海市 > 市辖区 > 黄浦区 > 小东门街道']
+ ['310101018', '豫园街道', '310101', '上海市 > 市辖区 > 黄浦区 > 豫园街道']
+ ['310101019', '老西门街道', '310101', '上海市 > 市辖区 > 黄浦区 > 老西门街道']
+ ['310101020', '五里桥街道', '310101', '上海市 > 市辖区 > 黄浦区 > 五里桥街道']
+ 13:32:34 1 of 1 OK created python table model dl_playground.dbo.test_pyspark_model ...... [OK in 29.31s]
+ 13:32:45
+ 13:32:45 Finished running 1 table model in 0 hours 4 minutes and 20.19 seconds (260.19s).
+ 13:32:45
+ 13:32:45 Completed successfully
+ 13:32:45
+ 13:32:45 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
+ ```
+
+## Changelog
+See [CHANGELOG.md](./CHANGELOG.md) for details on updates and version changes.
diff --git a/dbt/adapters/fabricspark/__version__.py b/dbt/adapters/fabricspark/__version__.py
index 21a042b..7dd1dec 100644
--- a/dbt/adapters/fabricspark/__version__.py
+++ b/dbt/adapters/fabricspark/__version__.py
@@ -1 +1 @@
-version = "1.7.0rc1"
+version = "1.9.1rc2"
diff --git a/dbt/adapters/fabricspark/column.py b/dbt/adapters/fabricspark/column.py
index 9f9e7d8..23a581e 100644
--- a/dbt/adapters/fabricspark/column.py
+++ b/dbt/adapters/fabricspark/column.py
@@ -1,7 +1,7 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional, TypeVar, Union
from dbt.adapters.base.column import Column
-from dbt.dataclass_schema import dbtClassMixin
+from dbt_common.dataclass_schema import dbtClassMixin
Self = TypeVar("Self", bound="SparkColumn")
diff --git a/dbt/adapters/fabricspark/connections.py b/dbt/adapters/fabricspark/connections.py
index 7804e11..7ddd49f 100644
--- a/dbt/adapters/fabricspark/connections.py
+++ b/dbt/adapters/fabricspark/connections.py
@@ -1,17 +1,17 @@
from contextlib import contextmanager
import os
-import dbt.exceptions
+import dbt.adapters.exceptions
from dbt.adapters.sql import SQLConnectionManager
-from dbt.contracts.connection import ConnectionState, AdapterResponse
-from dbt.events import AdapterLogger
-from dbt.events.functions import fire_event
-from dbt.events.types import ConnectionUsed, SQLQuery, SQLQueryStatus
+from dbt.adapters.contracts.connection import ConnectionState, AdapterResponse
+from dbt.adapters.events.logging import AdapterLogger
+from dbt_common.events.functions import fire_event
+from dbt.adapters.events.types import ConnectionUsed, SQLQuery, SQLQueryStatus
from dbt.utils import DECIMALS
from dbt.adapters.fabricspark.livysession import LivySessionConnectionWrapper, LivySessionManager
-from dbt.contracts.connection import Connection
-from dbt.dataclass_schema import StrEnum
+from dbt.adapters.contracts.connection import Connection
+from dbt_common.dataclass_schema import StrEnum
from typing import Any, Optional, Union, Tuple, List, Generator, Iterable, Sequence
from abc import ABC, abstractmethod
import time
@@ -163,7 +163,7 @@ def open(cls, connection: Connection) -> Connection:
msg = "Failed to connect"
if creds.token is not None:
msg += ", is your token valid?"
- raise dbt.exceptions.FailedToConnectError(msg) from e
+ raise dbt.adapters.exceptions.FailedToConnectError(msg) from e
retryable_message = _is_retryable_error(e)
if retryable_message and creds.connect_retries > 0:
msg = (
@@ -184,12 +184,12 @@ def open(cls, connection: Connection) -> Connection:
logger.warning(msg)
time.sleep(creds.connect_timeout)
else:
- raise dbt.exceptions.FailedToConnectError("failed to connect") from e
+ raise dbt.adapters.exceptions.FailedToConnectError("failed to connect") from e
else:
raise exc # type: ignore
if handle is None:
- raise dbt.exceptions.FailedToConnectError("Failed to connect to Livy session. Common reasons for errors: \n1. Invalid/expired credentials (if using CLI authentication, re-run `az login` in your terminal) \n2. Invalid endpoint \n3. Invalid workspaceid or lakehouseid (do you have the correct permissions?) \n4. Invalid or non-existent shortcuts json path, or improperly formatted shortcuts")
+ raise dbt.adapters.exceptions.FailedToConnectError("Failed to connect to Livy")
connection.handle = handle
connection.state = ConnectionState.OPEN
return connection
@@ -240,7 +240,7 @@ def fetch_spark_version(cls, connection) -> None:
try:
sql = "split(version(), ' ')[0] as version"
cursor = connection.handle.cursor()
- cursor.execute(sql)
+ cursor.execute(sql, 'sql') # add language parameter
res = cursor.fetchall()
SparkConnectionManager.spark_version = res[0][0]
@@ -252,9 +252,11 @@ def fetch_spark_version(cls, connection) -> None:
os.environ["DBT_SPARK_VERSION"] = SparkConnectionManager.spark_version
logger.debug(f"SPARK VERSION {os.getenv('DBT_SPARK_VERSION')}")
+ # Add language parameter.
def add_query(
self,
sql: str,
+ language: str = 'sql',
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
@@ -276,7 +278,7 @@ def add_query(
cursor = connection.handle.cursor()
try:
- cursor.execute(sql, bindings)
+ cursor.execute(sql, language, bindings)
except Exception as ex:
query_exception = ex
diff --git a/dbt/adapters/fabricspark/fabric_spark_credentials.py b/dbt/adapters/fabricspark/fabric_spark_credentials.py
index 6a5dd4b..8c28baa 100644
--- a/dbt/adapters/fabricspark/fabric_spark_credentials.py
+++ b/dbt/adapters/fabricspark/fabric_spark_credentials.py
@@ -1,4 +1,4 @@
-from dbt.adapters.base import Credentials
+from dbt.adapters.contracts.connection import Credentials
from typing import Any, Dict, Optional, Tuple
from dataclasses import dataclass, field
import dbt.exceptions
@@ -56,7 +56,8 @@ def __post_init__(self) -> None:
# f"On Spark, lakehouse must be omitted or have the same value as"
# # f" schema."
# # )
- self.schema = self.lakehouse
+ self.schema = f"{self.lakehouse}.{self.schema}" # Support Fabric Lakehouse schema
+ # self.schema = self.lakehouse
@property
def type(self) -> str:
diff --git a/dbt/adapters/fabricspark/impl.py b/dbt/adapters/fabricspark/impl.py
index 521e9d8..ec71774 100644
--- a/dbt/adapters/fabricspark/impl.py
+++ b/dbt/adapters/fabricspark/impl.py
@@ -1,25 +1,28 @@
import re
from concurrent.futures import Future
from dataclasses import dataclass
-from typing import Any, Dict, Iterable, List, Optional, Union, Tuple, Callable, Set
+from typing import Any, Dict, Iterable, List, Optional, Union, Tuple, Callable, Set, Type
from dbt.adapters.base.relation import InformationSchema
from dbt.contracts.graph.manifest import Manifest
from typing_extensions import TypeAlias
import agate
import dbt
import dbt.exceptions
-from dbt.adapters.base import AdapterConfig
+from dbt.adapters.base import AdapterConfig, PythonJobHelper
from dbt.adapters.base.impl import catch_as_completed, ConstraintSupport
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.fabricspark import SparkConnectionManager
from dbt.adapters.fabricspark import SparkRelation
from dbt.adapters.fabricspark import SparkColumn
from dbt.adapters.base import BaseRelation
-from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER
+from dbt_common.clients.agate_helper import DEFAULT_TYPE_TESTER
from dbt.contracts.graph.nodes import ConstraintType
-from dbt.contracts.relation import RelationType
-from dbt.events import AdapterLogger
-from dbt.utils import executor, AttrDict
+from dbt.adapters.contracts.relation import RelationType
+from dbt.adapters.events.logging import AdapterLogger
+from dbt_common.utils import executor, AttrDict
+
+from dbt.adapters.fabricspark.python_submissions import BaseFabricSparkHelper
+from dbt.adapters.contracts.connection import AdapterResponse
logger = AdapterLogger("fabricspark")
@@ -439,7 +442,7 @@ def get_rows_different_sql(
def run_sql_for_tests(self, sql, fetch, conn):
cursor = conn.handle.cursor()
try:
- cursor.execute(sql)
+ cursor.execute(sql, "sql") # Add language parameter.
if fetch == "one":
if hasattr(cursor, "fetchone"):
return cursor.fetchone()
@@ -456,6 +459,28 @@ def run_sql_for_tests(self, sql, fetch, conn):
finally:
conn.transaction_open = False
+ # Add Python Model Support
+ @property
+ def default_python_submission_method(self) -> str:
+ return "livy_session_statement"
+
+ # Add Python Model Support
+ @property
+ def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
+ # TODO: Figure out submission_method enumerates to enable Livy Batch Jobs.
+ # The current submission method returns 'workflow_job', but in reality, what we need is a Livy session statement job.
+ # The keyword 'workflow_job' cannot be found in dbt-spark, dbt-fabricsspark, or dbt-core.
+ # I assume the two types of jobs are defined as 'livy_session_batch' and 'livy_session_statement'.
+ return {
+ "workflow_job": BaseFabricSparkHelper,
+ "livy_session_statement": BaseFabricSparkHelper,
+ "livy_session_batch": BaseFabricSparkHelper,
+ }
+
+ # Add Python Model Support
+ def generate_python_submission_response(self, submission_result: Any) -> AdapterResponse:
+ return self.connections.get_response(None)
+
def standardize_grants_dict(self, grants_table: agate.Table) -> dict:
grants_dict: Dict[str, List[str]] = {}
for row in grants_table:
@@ -475,7 +500,7 @@ def standardize_grants_dict(self, grants_table: agate.Table) -> dict:
def debug_query(self) -> None:
"""Override for DebugTask method"""
- self.execute("select 1 as id")
+ self.execute("select 1 as id", "sql") # Add language parameter.
# spark does something interesting with joins when both tables have the same
diff --git a/dbt/adapters/fabricspark/livysession.py b/dbt/adapters/fabricspark/livysession.py
index 25d4f58..371a8a1 100644
--- a/dbt/adapters/fabricspark/livysession.py
+++ b/dbt/adapters/fabricspark/livysession.py
@@ -9,12 +9,14 @@
from types import TracebackType
from typing import Any
import dbt.exceptions
-from dbt.events import AdapterLogger
+from dbt.adapters.events.logging import AdapterLogger
from dbt.utils import DECIMALS
from azure.core.credentials import AccessToken
from azure.identity import AzureCliCredential, ClientSecretCredential
from dbt.adapters.fabricspark.fabric_spark_credentials import SparkCredentials
from dbt.adapters.fabricspark.shortcuts import ShortcutClient
+import textwrap
+from dbt_common.exceptions import DbtDatabaseError
logger = AdapterLogger("Microsoft Fabric-Spark")
NUMBERS = DECIMALS + (int, float)
@@ -101,7 +103,7 @@ def get_headers(credentials: SparkCredentials, tokenPrint: bool = False) -> dict
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {accessToken.token}"}
if tokenPrint:
- logger.debug(accessToken.token)
+ logger.debug("accessToken:"+accessToken.token)
return headers
@@ -193,9 +195,6 @@ def delete_session(self) -> None:
logger.error(f"Unable to close the livy session {self.session_id}, error: {ex}")
def is_valid_session(self) -> bool:
- if (self.session_id is None):
- logger.error("Session ID is None")
- return False
res = requests.get(
self.connect_url + "/sessions/" + self.session_id,
headers=get_headers(self.credential, False),
@@ -279,13 +278,14 @@ def close(self) -> None:
"""
self._rows = None
- def _submitLivyCode(self, code) -> Response:
+ # Add language parameter for diffrent workloads(sql/pyspark).
+ def _submitLivyCode(self, code, language="sql") -> Response:
if self.livy_session.is_new_session_required:
LivySessionManager.connect(self.credential)
self.session_id = self.livy_session.session_id
- # Submit code
- data = {"code": code, "kind": "sql"}
+ # Submit code. Enable pyspark tasks.
+ data = {"code": code, "kind": language}
logger.debug(
f"Submitted: {data} {self.connect_url + '/sessions/' + self.session_id + '/statements'}"
)
@@ -309,6 +309,10 @@ def _getLivySQL(self, sql) -> str:
code = re.sub(r"\s*/\*(.|\n)*?\*/\s*", "\n", sql, re.DOTALL).strip()
return code
+ # Trim any common leading whitespace in Python codes.
+ def _getLivyPyspark(self, code) -> str:
+ return textwrap.dedent(code)
+
def _getLivyResult(self, res_obj) -> Response:
json_res = res_obj.json()
while True:
@@ -326,14 +330,17 @@ def _getLivyResult(self, res_obj) -> Response:
return res
time.sleep(DEFAULT_POLL_STATEMENT_WAIT)
- def execute(self, sql: str, *parameters: Any) -> None:
+ # Support pyspark tasks.
+ def execute(self, code: str, language: str, *parameters: Any) -> None:
"""
- Execute a sql statement.
+ Execute a sql statement or a pyspark statement.
Parameters
----------
- sql : str
- Execute a sql statement.
+ code : str
+ Execute a sql statement or a pyspark statement.
+ language : str
+ Specify workloads: sql/pyspark
*parameters : Any
The parameters.
@@ -346,15 +353,20 @@ def execute(self, sql: str, *parameters: Any) -> None:
------
https://github.com/mkleehammer/pyodbc/wiki/Cursor#executesql-parameters
"""
+ # print("LivyCursor.execute()".center(80,'-'))
+ # print(f"language={language}")
+ # print(code)
if len(parameters) > 0:
- sql = sql % parameters
+ code = code % parameters
# TODO: handle parameterised sql
- res = self._getLivyResult(self._submitLivyCode(self._getLivySQL(sql)))
+ # final process for submition
+ final_code = self._getLivyPyspark(code) if language == "pyspark" else self._getLivySQL(code)
+
+ res = self._getLivyResult(self._submitLivyCode(final_code, language))
logger.debug(res)
if res["output"]["status"] == "ok":
- # values = res['output']['data']['application/json']
values = res["output"]["data"]["application/json"]
if len(values) >= 1:
self._rows = values["data"] # values[0]['values']
@@ -367,8 +379,7 @@ def execute(self, sql: str, *parameters: Any) -> None:
else:
self._rows = None
self._schema = None
-
- raise dbt.exceptions.DbtDatabaseError(
+ raise DbtDatabaseError(
"Error while executing query: " + res["output"]["evalue"]
)
@@ -481,11 +492,8 @@ def connect(credentials: SparkCredentials) -> LivyConnection:
__class__.livy_global_session.is_new_session_required = False
# create shortcuts, if there are any
if credentials.shortcuts_json_path:
- try:
- shortcut_client = ShortcutClient(accessToken.token, credentials.workspaceid, credentials.lakehouseid, credentials.endpoint)
- shortcut_client.create_shortcuts(credentials.shortcuts_json_path)
- except Exception as ex:
- logger.error(f"Unable to create shortcuts: {ex}")
+ shortcut_client = ShortcutClient(accessToken.token, credentials.workspaceid, credentials.lakehouseid)
+ shortcut_client.create_shortcuts(credentials.shortcuts_json_path)
elif not __class__.livy_global_session.is_valid_session():
__class__.livy_global_session.delete_session()
__class__.livy_global_session.create_session(data)
@@ -500,11 +508,9 @@ def connect(credentials: SparkCredentials) -> LivyConnection:
@staticmethod
def disconnect() -> None:
- if __class__.livy_global_session is not None and __class__.livy_global_session.is_valid_session():
+ if __class__.livy_global_session.is_valid_session():
__class__.livy_global_session.delete_session()
__class__.livy_global_session.is_new_session_required = True
- else:
- logger.debug("No session to disconnect")
class LivySessionConnectionWrapper(object):
@@ -530,15 +536,23 @@ def rollback(self, *args, **kwargs):
def fetchall(self):
return self._cursor.fetchall()
- def execute(self, sql, bindings=None):
+ # Add language parameter
+ def execute(self, sql, language, bindings=None):
+ # print("LivySessionConnectionWrapper.execute()".center(80,'-'))
+ # print(f"language={language}")
+ # TODO: get model language from job context and use that here
+ if language not in ['sql','pyspark']:
+ language = 'sql'
+ if 'def model(dbt, session):' in sql:
+ language = 'pyspark'
if sql.strip().endswith(";"):
sql = sql.strip()[:-1]
if bindings is None:
- self._cursor.execute(sql)
+ self._cursor.execute(sql, language)
else:
bindings = [self._fix_binding(binding) for binding in bindings]
- self._cursor.execute(sql, *bindings)
+ self._cursor.execute(sql, language, *bindings)
@property
def description(self):
diff --git a/dbt/adapters/fabricspark/python_submissions.py b/dbt/adapters/fabricspark/python_submissions.py
new file mode 100644
index 0000000..7a3e436
--- /dev/null
+++ b/dbt/adapters/fabricspark/python_submissions.py
@@ -0,0 +1,58 @@
+from typing import Dict
+
+from dbt.adapters.base import PythonJobHelper
+from dbt.adapters.contracts.connection import Connection
+from dbt_common.exceptions import DbtRuntimeError,DbtDatabaseError
+
+from dbt.adapters.fabricspark import SparkCredentials
+from dbt.adapters.fabricspark.livysession import LivySessionManager
+
+class BaseFabricSparkHelper(PythonJobHelper):
+ """
+ Implementation of PythonJobHelper for FabricSpark.
+ """
+ def __init__(self, parsed_model: Dict, credentials: SparkCredentials) -> None:
+ """
+ Initialize Spark Job Submission helper.
+
+ Parameters
+ ----------
+ parsed_model(Dict)
+ A dictionary containing the parsed model information, used to extract various configurations required for job submission.
+ credentials(SparkCredentials)
+ A SparkCredentials object containing the credentials needed to access the Spark cluster, used to establish the connection.
+ """
+ self.credentials = credentials
+ self.relation_name = parsed_model.get('relation_name')
+ self.original_file_path = parsed_model.get('original_file_path')
+ self.submission_method = parsed_model.get('config',{}).get('submission_method')
+ self.connection = self._get_or_create_connection()
+
+ def _get_or_create_connection(self) -> Connection:
+ """
+ Get the existing Livy connection, or create one using SparkConnectionManager if it does not exist.
+ """
+ connection = LivySessionManager.connect(self.credentials)
+ return connection
+
+ def submit(self, compiled_code: str) -> None:
+ """
+ Submits compiled code to the database and handles execution results or errors.
+
+ Parameters
+ ----------
+ compiled_code (str):
+ The compiled code string to be executed.
+
+ Raises
+ ------
+ DbtRuntimeError
+ """
+
+ cursor = self.connection.cursor()
+ try:
+ cursor.execute(compiled_code, 'pyspark')
+ for line in cursor.fetchall():
+ print(line)
+ except DbtDatabaseError as ex:
+ raise DbtRuntimeError(f"Unable to create model {self.relation_name}(file: {self.original_file_path}) with a {self.submission_method} type submission. Caused by:\n{ex.msg}")
diff --git a/dbt/adapters/fabricspark/relation.py b/dbt/adapters/fabricspark/relation.py
index 55dc1a7..7052a92 100644
--- a/dbt/adapters/fabricspark/relation.py
+++ b/dbt/adapters/fabricspark/relation.py
@@ -4,7 +4,7 @@
from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.exceptions import DbtRuntimeError
-from dbt.events import AdapterLogger
+from dbt.adapters.events.logging import AdapterLogger
logger = AdapterLogger("fabricspark")
diff --git a/dbt/adapters/fabricspark/shortcuts.py b/dbt/adapters/fabricspark/shortcuts.py
index 776a809..c714181 100644
--- a/dbt/adapters/fabricspark/shortcuts.py
+++ b/dbt/adapters/fabricspark/shortcuts.py
@@ -3,7 +3,7 @@
from dataclasses import dataclass
from typing import Optional
from enum import Enum
-from dbt.events import AdapterLogger
+from dbt.adapters.events.logging import AdapterLogger
logger = AdapterLogger("Microsoft Fabric-Spark")
@@ -60,11 +60,11 @@ def __str__(self):
"""
return f"Shortcut: {self.shortcut_name} from {self.source_path} to {self.path}"
- def connect_url(self, endpoint: str = "https://api.fabric.microsoft.com/v1"):
+ def connect_url(self):
"""
Returns the connect URL for the shortcut.
"""
- return f"{endpoint}/workspaces/{self.source_workspace_id}/items/{self.source_item_id}/shortcuts/{self.source_path}/{self.shortcut_name}"
+ return f"https://api.fabric.microsoft.com/v1/workspaces/{self.source_workspace_id}/items/{self.source_item_id}/shortcuts/{self.source_path}/{self.shortcut_name}"
def get_target_body(self):
"""
@@ -81,7 +81,7 @@ def get_target_body(self):
class ShortcutClient:
- def __init__(self, token: str, workspace_id: str, item_id: str, endpoint: str = "https://api.fabric.microsoft.com/v1"):
+ def __init__(self, token: str, workspace_id: str, item_id: str):
"""
Initializes a ShortcutClient object.
@@ -93,7 +93,6 @@ def __init__(self, token: str, workspace_id: str, item_id: str, endpoint: str =
self.token = token
self.workspace_id = workspace_id
self.item_id = item_id
- self.endpoint = endpoint
def parse_json(self, json_str: str):
"""
@@ -103,19 +102,15 @@ def parse_json(self, json_str: str):
json_str (str): The JSON string to parse.
"""
shortcuts = []
- try:
- parsed_json = json.loads(json_str)
- for shortcut in parsed_json:
- # convert string target to TargetName enum
- shortcut["target"] = TargetName(shortcut["target"])
- try:
- shortcut_obj = Shortcut(**shortcut)
- except Exception as e:
- raise ValueError(f"Could not parse shortcut: {shortcut} with error: {e}")
- shortcuts.append(shortcut_obj)
- return shortcuts
- except Exception as e:
- raise ValueError(f"Could not parse JSON: {json_str} with error: {e}")
+ for shortcut in json.loads(json_str):
+ # convert string target to TargetName enum
+ shortcut["target"] = TargetName(shortcut["target"])
+ try:
+ shortcut_obj = Shortcut(**shortcut)
+ except Exception as e:
+ raise ValueError(f"Could not parse shortcut: {shortcut} with error: {e}")
+ shortcuts.append(shortcut_obj)
+ return shortcuts
def create_shortcuts(self, json_path: str, max_retries: int = 3):
"""
@@ -153,7 +148,7 @@ def check_exists(self, shortcut: Shortcut):
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
- response = requests.get(shortcut.connect_url(self.endpoint), headers=headers)
+ response = requests.get(shortcut.connect_url(), headers=headers)
# check if the error is ItemNotFound
if response.status_code == 404:
return False
@@ -177,7 +172,7 @@ def delete_shortcut(self, shortcut_path: str, shortcut_name: str):
shortcut_path (str): The path where the shortcut is located.
shortcut_name (str): The name of the shortcut.
"""
- connect_url = f"{self.endpoint}/workspaces/{self.workspace_id}/items/{self.item_id}/shortcuts/{shortcut_path}/{shortcut_name}"
+ connect_url = f"https://api.fabric.microsoft.com/v1/workspaces/{self.workspace_id}/items/{self.item_id}/shortcuts/{shortcut_path}/{shortcut_name}"
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
@@ -196,7 +191,7 @@ def create_shortcut(self, shortcut: Shortcut):
if self.check_exists(shortcut):
logger.debug(f"Shortcut {shortcut} already exists, skipping...")
return
- connect_url = f"{self.endpoint}/workspaces/{self.workspace_id}/items/{self.item_id}/shortcuts"
+ connect_url = f"https://api.fabric.microsoft.com/v1/workspaces/{self.workspace_id}/items/{self.item_id}/shortcuts"
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
diff --git a/dbt/include/fabricspark/macros/materializations/models/table/table.sql b/dbt/include/fabricspark/macros/materializations/models/table/table.sql
index fee496b..7169667 100644
--- a/dbt/include/fabricspark/macros/materializations/models/table/table.sql
+++ b/dbt/include/fabricspark/macros/materializations/models/table/table.sql
@@ -29,9 +29,13 @@
{{ adapter.drop_relation(existing_relation.incorporate(type=old_relation_type)) }}
{% endif %}
- -- build model
+ -- build model, support Python models, wlliang02@glp.com
{%- call statement('main', language=language) -%}
- {{ create_table_as(False, target_relation, compiled_code, language) }}
+ {%- if language == 'sql' -%}
+ {{ create_table_as(False, target_relation, compiled_code, language) }}
+ {%- elif language == 'python' -%}
+ {{ py_write_table(compiled_code, target_relation) }}
+ {% endif %}
{%- endcall -%}
/*
@@ -99,6 +103,14 @@ else:
raise Exception(msg)
df.write.mode("overwrite").format("{{ config.get('file_format', 'delta') }}").option("overwriteSchema", "true").saveAsTable("{{ target_relation }}")
+import json
+# Do not print the whole dataframe, only the first 10 rows for performance reasons
+sample_df = df.limit(10).na.fill("null")
+result = {
+ 'data': [ list(json.loads(row).values()) for row in sample_df.toJSON().collect() ],
+ 'schema': df.schema.jsonValue()
+}
+%json result
{%- endmacro -%}
{%macro py_script_comment()%}
diff --git a/dbt/include/fabricspark/macros/materializations/models/view/read_lakehouse_file.sql b/dbt/include/fabricspark/macros/materializations/models/view/read_lakehouse_file.sql
new file mode 100644
index 0000000..85962ed
--- /dev/null
+++ b/dbt/include/fabricspark/macros/materializations/models/view/read_lakehouse_file.sql
@@ -0,0 +1,5 @@
+{% macro read_lakehouse_file(file_path, file_format) %}
+ {%- set file_full_path = "abfss://{workspaceid}@onelake.dfs.fabric.microsoft.com/{lakehouseid}/{file_path}".format(workspaceid=target.workspaceid,lakehouseid=target.lakehouseid,file_path=file_path) -%}
+ {{ log('Query on file: ' ~ file_full_path, info=True) }}
+ {{ file_format }}.`{{ file_full_path }}`
+{% endmacro %}
\ No newline at end of file
diff --git a/scripts/Makefile b/scripts/Makefile
new file mode 100644
index 0000000..974aa9c
--- /dev/null
+++ b/scripts/Makefile
@@ -0,0 +1,22 @@
+.DEFAULT_GOAL := help
+
+.PHONY: help build clean install rebuild
+
+help: ## Show usages
+ @echo "Usage:"
+ @grep -E '^[0-9a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
+
+build: ## Package dbt-fabricspark-costom & Clean temporary files
+ @echo "\033[0;32mStarting Build...\033[0m"
+ @python setup.py sdist bdist_wheel
+ @echo "\033[0;32mBuild complete!\033[0m"
+
+clean: ## Clean up all built packages & metadata
+ @echo "\033[0;31mCleaning up...\033[0m"
+ @rm -rf ../build ../dist ../dbt_fabricspark_custom.egg-info
+ @echo "\033[0;31mCleanup complete.\033[0m"
+
+install: ## Install package dbt-fabricspark-custom
+ @python -m pip install $(shell ls -t dist/dbt_fabricspark_custom-*.whl | head -n 1)
+
+rebuild: clean build ## Clean up & Build again
\ No newline at end of file
diff --git a/scripts/setup.py b/scripts/setup.py
new file mode 100644
index 0000000..d959a87
--- /dev/null
+++ b/scripts/setup.py
@@ -0,0 +1,45 @@
+import os
+import sys
+import site
+from setuptools import setup
+
+try:
+ from setuptools import find_namespace_packages
+except ImportError:
+ # the user has a downlevel version of setuptools.
+ print("Error: dbt requires setuptools v40.1.0 or higher.")
+ print('Please upgrade setuptools with "pip install --upgrade setuptools" ' "and try again")
+ sys.exit(1)
+
+base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
+print(base_dir)
+sys.path.append(base_dir)
+os.chdir(base_dir)
+
+from dbt.adapters.fabricspark.__version__ import version
+
+with open(os.path.join(base_dir, "CHANGELOG.md"), "r", encoding="utf8") as f:
+ long_description = f.read()
+
+setup(
+ name='dbt-fabricspark-custom',
+ version=version,
+ description="The Apache Spark adapter plugin for dbt",
+ long_description=long_description,
+ packages=find_namespace_packages(include=["dbt", "dbt.*","dbt_common.*"]),
+ package_data={
+ 'dbt.include.fabricspark': ["macros/**/*",'dbt_project.yml','profile_template.yml'],
+ },
+ include_package_data=True,
+ install_requires=["dbt-core==1.9.1", "dbt-fabric==1.9.0", "dbt-spark==1.9.0"],
+ zip_safe=False,
+ classifiers=[
+ "Development Status :: 5 - Production/Stable",
+ "License :: OSI Approved :: Apache Software License",
+ "Operating System :: Microsoft :: Windows",
+ "Operating System :: MacOS :: MacOS X",
+ "Operating System :: POSIX :: Linux",
+ "Programming Language :: Python :: 3.12",
+ ],
+ python_requires=">=3.8",
+)