Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,6 @@ $RECYCLE.BIN/

# End of https://www.toptal.com/developers/gitignore/api/windows,macos,linux,visualstudiocode,python
test.env

# Temp
.temp/*
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,34 @@ fabric-spark-test:
retry_all: true
```

### Session Reuse

By default, the adapter reuses Livy sessions across dbt runs to avoid the overhead of creating new sessions each time. Session IDs are persisted to a file so they can be reused in subsequent runs.

**Configuration options:**

- `session_id_file` (optional): Path to the file storing the Livy session ID. Defaults to `./livy-session-id.txt` in the current working directory.

Example with custom session file:

```yaml
fabric-spark-test:
target: fabricspark-dev
outputs:
fabricspark-dev:
# ... other settings ...
session_id_file: /path/to/my-session-id.txt
```

**Session reuse behavior:**

1. On first run: Creates a new Livy session and saves the session ID to the file
2. On subsequent runs: Reads the session ID from file and attempts to reuse it
3. If the session is invalid (dead, stopped, or doesn't exist): Creates a new session and updates the file
4. Sessions are intentionally kept alive after dbt exits for reuse

To force a new session, simply delete the session ID file before running dbt.

### Reporting bugs and contributing code

- Want to report a bug or request a feature? Let us know on [Slack](http://slack.getdbt.com/), or open [an issue](https://github.com/microsoft/dbt-fabricspark/issues/new).
Expand Down
58 changes: 58 additions & 0 deletions contrib/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Contributing

## How to use, on a Linux machine

1. Windows pre-reqs

```powershell
winget install -e --id Microsoft.VisualStudioCode
```

1. Get a fresh new WSL machine up:

```powershell
$GIT_ROOT = git rev-parse --show-toplevel
& "$GIT_ROOT\contrib\bootstrap-dev-env.ps1"
```

2. Clone the repo, and open VSCode in it:

```bash
cd ~/

git config --global user.name "Raki Rahman"
git config --global user.email "mdrakiburrahman@gmail.com"
git clone https://github.com/mdrakiburrahman/dbt-fabricspark.git

cd dbt-fabricspark/
code .
```

3. Run the bootstrapper script, that installs all tools idempotently:

```bash
GIT_ROOT=$(git rev-parse --show-toplevel)
chmod +x ${GIT_ROOT}/contrib/bootstrap-dev-env.sh && ${GIT_ROOT}/contrib/bootstrap-dev-env.sh
```

4. Source the path to apply environment changes:

```bash
source ~/.bashrc
```


5. Dev loop:

```bash
# Build wheel
rm -rf /home/mdrrahman/dbt-fabricspark/dist
uv build

# Run unit tests
uv run pytest -v

# Upload to storage, and install wheel on client machine
pip uninstall -y dbt-fabricspark 2>/dev/null
pip install https://rakirahman.blob.core.windows.net/public/whls/dbt_fabricspark-1.9.1-py3-none-any.whl
```
34 changes: 34 additions & 0 deletions contrib/bootstrap-dev-env.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<#

.SYNOPSIS
Bootstraps a Windows Cloud DevBox with WSL pre-reqs.

.NOTES

- The script uninstalls Docker Desktop as it interferes with WSL2.

#>

if (wsl -l -q | Select-String -SimpleMatch "Ubuntu-24.04") {
Write-Host "Unregistering Ubuntu-24.04"
wsl --unregister Ubuntu-24.04
}

$memGB=[math]::Floor((Get-CimInstance Win32_ComputerSystem).TotalPhysicalMemory/1GB)
$cpu=[Environment]::ProcessorCount
$swap=[math]::Floor($memGB/4)
@"
[wsl2]
memory=${memGB}GB
processors=$cpu
swap=${swap}GB
networkingMode=NAT
"@ | Set-Content -Path "$env:USERPROFILE\.wslconfig"

Write-Host "Restarting WSL to apply settings"
wsl --shutdown

winget install -e --id Microsoft.GitCredentialManagerCore

Write-Host "Installing Ubuntu-24.04"
wsl --install -d Ubuntu-24.04
36 changes: 36 additions & 0 deletions contrib/bootstrap-dev-env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/bin/bash
#
#
# Sets up a dev env with all pre-reqs.
#
# This script is idempotent, it will only attempt to install
# dependencies if not exists.
#
# ---------------------------------------------------------------------------------------
#
set -e

export REPO_ROOT=$(git rev-parse --show-toplevel)
export DEBIAN_FRONTEND=noninteractive

PACKAGES=""
command -v python &>/dev/null || PACKAGES="python3 python-is-python3 python3-venv"
command -v pip &>/dev/null || PACKAGES="$PACKAGES python3-pip"
command -v curl &>/dev/null || PACKAGES="$PACKAGES curl"

[ -n "$PACKAGES" ] && sudo apt-get update -qq && sudo apt-get install -yqq $PACKAGES

command -v uv &>/dev/null || { curl -LsSf https://astral.sh/uv/install.sh | sh; source "$HOME/.local/bin/env" 2>/dev/null || true; }
command -v az &>/dev/null || curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash

[[ ":$PATH:" != *":$HOME/.local/bin:"* ]] && export PATH="$HOME/.local/bin:$PATH"

cd "$REPO_ROOT"
[ ! -d "$REPO_ROOT/.venv" ] && uv venv "$REPO_ROOT/.venv"
source "$REPO_ROOT/.venv/bin/activate"
uv pip install -e . --group dev
[ ! -f "$REPO_ROOT/test.env" ] && cp "$REPO_ROOT/test.env.example" "$REPO_ROOT/test.env"

code --install-extension donjayamanne.python-extension-pack

echo "Done. Python: $(python --version), uv: $(uv --version), az: $(az version -o tsv 2>/dev/null | head -1)"
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ filterwarnings = [
"ignore:.*datetime.datetime.utcnow.*:DeprecationWarning", # https://github.com/dbt-labs/dbt-core/issues/9791
]
env_files = ["test.env"]
testpaths = ["tests/unit", "tests/functional"]
testpaths = ["tests/unit"]

[tool.ruff]
# Target Python versions
Expand Down
2 changes: 1 addition & 1 deletion src/dbt/adapters/fabricspark/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.9.0"
version = "1.9.1"
8 changes: 6 additions & 2 deletions src/dbt/adapters/fabricspark/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,15 @@ def release(self) -> None:

@classmethod
def cleanup_all(self) -> None:
"""Clean up all connection managers without closing Livy sessions.

Sessions are intentionally kept alive for reuse by subsequent dbt runs.
"""
for thread_id in self.connection_managers:
livySession = self.connection_managers[thread_id]
livySession.disconnect()
livySession.disconnect() # This no longer deletes the Livy session

# garbage collect these connections
# garbage collect these connection manager references
self.connection_managers.clear()

@classmethod
Expand Down
52 changes: 45 additions & 7 deletions src/dbt/adapters/fabricspark/credentials.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Tuple
from typing import Any, Dict, Literal, Optional, Tuple

from dbt_common.exceptions import DbtRuntimeError

Expand All @@ -8,16 +9,24 @@

logger = AdapterLogger("fabricspark")

# Mode types for Livy connection
LivyMode = Literal["fabric", "local"]

# Default session ID file name
DEFAULT_SESSION_ID_FILENAME = "livy-session-id.txt"


@dataclass
class FabricSparkCredentials(Credentials):
schema: Optional[str] = None # type: ignore
method: str = "livy"
livy_mode: LivyMode = "fabric" # "fabric" or "local"
workspaceid: Optional[str] = None
database: Optional[str] = None # type: ignore
lakehouse: Optional[str] = None
lakehouseid: Optional[str] = None
endpoint: str = "https://msitapi.fabric.microsoft.com/v1"
endpoint: Optional[str] = "https://api.fabric.microsoft.com/v1" # Required for Fabric mode, optional for local mode
livy_url: str = "http://localhost:8998" # Local Livy URL
client_id: Optional[str] = None
client_secret: Optional[str] = None
tenant_id: Optional[str] = None
Expand All @@ -30,6 +39,8 @@ class FabricSparkCredentials(Credentials):
lakehouse_schemas_enabled: bool = False
accessToken: Optional[str] = None
spark_config: Dict[str, Any] = field(default_factory=dict)
# Optional path to session ID file for session reuse. If not provided, defaults to ./livy-session-id.txt
session_id_file: Optional[str] = None

@classmethod
def __pre_deserialize__(cls, data: Any) -> Any:
Expand All @@ -38,24 +49,49 @@ def __pre_deserialize__(cls, data: Any) -> Any:
data["lakehouse"] = None
return data

@property
def is_local_mode(self) -> bool:
"""Check if running in local Livy mode."""
return self.livy_mode == "local"

@property
def resolved_session_id_file(self) -> str:
"""Get the resolved path to the session ID file.

If session_id_file is provided, use it. Otherwise, use the default
file name in the current working directory.
"""
if self.session_id_file:
return self.session_id_file
return os.path.join(os.getcwd(), DEFAULT_SESSION_ID_FILENAME)

@property
def lakehouse_endpoint(self) -> str:
# TODO: Construct Endpoint of the lakehouse from the
"""Get the Livy endpoint URL based on mode."""
if self.is_local_mode:
return self.livy_url
# Fabric mode: Construct Endpoint of the lakehouse
return f"{self.endpoint}/workspaces/{self.workspaceid}/lakehouses/{self.lakehouseid}/livyapi/versions/2023-12-01"

def __post_init__(self) -> None:
if self.method is None:
raise DbtRuntimeError("Must specify `method` in profile")
if self.workspaceid is None:
raise DbtRuntimeError("Must specify `workspace guid` in profile")
if self.lakehouseid is None:
raise DbtRuntimeError("Must specify `lakehouse guid` in profile")
if self.schema is None:
raise DbtRuntimeError("Must specify `schema` in profile")
if self.database is not None:
raise DbtRuntimeError(
"database property is not supported by adapter. Set database as none and use lakehouse instead."
)

# Fabric-specific validations (only when not in local mode)
if not self.is_local_mode:
if self.endpoint is None:
raise DbtRuntimeError("Must specify `endpoint` in profile for Fabric mode")
if self.workspaceid is None:
raise DbtRuntimeError("Must specify `workspaceid` in profile for Fabric mode")
if self.lakehouseid is None:
raise DbtRuntimeError("Must specify `lakehouseid` in profile for Fabric mode")

if self.lakehouse_schemas_enabled and self.schema is None:
raise DbtRuntimeError(
"Please provide a schema name because you enabled lakehouse schemas"
Expand All @@ -78,6 +114,8 @@ def type(self) -> str:

@property
def unique_field(self) -> str:
if self.is_local_mode:
return self.livy_url
return self.lakehouseid

def _connection_keys(self) -> Tuple[str, ...]:
Expand Down
Loading
Loading