Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta Exporter: Azure Support #7444

Merged
merged 4 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/esti.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -900,8 +900,8 @@ jobs:
DOCKER_REG: ${{ needs.login-to-amazon-ecr.outputs.registry }}
LAKEFS_DATABASE_TYPE: postgres
LAKEFS_BLOCKSTORE_TYPE: azure
LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCOUNT: esti
LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY2 }}
ESTI_AZURE_STORAGE_ACCOUNT: esti
ESTI_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY }}
ESTI_BLOCKSTORE_TYPE: azure
ESTI_STORAGE_NAMESPACE: https://esti.blob.core.windows.net/esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }}

Expand Down Expand Up @@ -954,11 +954,11 @@ jobs:
LAKEFS_DATABASE_COSMOSDB_CONTAINER: ${{ github.run_number }}-${{ steps.unique.outputs.value }}
LAKEFS_DATABASE_COSMOSDB_KEY: ${{ secrets.LAKEFS_DATABASE_COSMOSDB_READWRITEKEY }}
LAKEFS_BLOCKSTORE_TYPE: azure
LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCOUNT: esti4hns
LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_GEN2_ACCESS_KEY }}
ESTI_BLOCKSTORE_TYPE: azure
ESTI_STORAGE_NAMESPACE: https://esti4hns.blob.core.windows.net/esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }}
ESTI_ADLS_IMPORT_BASE_URL: https://esti4hns.adls.core.windows.net/esti-system-testing-data/
ESTI_AZURE_STORAGE_ACCOUNT: esti4hns
ESTI_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_GEN2_ACCESS_KEY }}

- name: cleanup cosmos db container
if: always()
Expand Down
155 changes: 90 additions & 65 deletions docs/howto/hooks/lua.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,30 @@ The Lua runtime embedded in lakeFS is limited for security reasons. The provided

Helper function to mark a table object as an array for the runtime by setting `_is_array: true` metatable field.

### `aws/s3.get_object(bucket, key)`
### `aws`

### `aws/s3_client`
S3 client library.

```lua
local aws = require("aws")
-- pass valid AWS credentials
local client = aws.s3_client("ACCESS_KEY_ID", "SECRET_ACCESS_KEY", "REGION")
```

### `aws/s3_client.get_object(bucket, key)`

Returns the body (as a Lua string) of the requested object and a boolean value that is true if the requested object exists

### `aws/s3.put_object(bucket, key, value)`
### `aws/s3_client.put_object(bucket, key, value)`

Sets the object at the given bucket and key to the value of the supplied value string

### `aws/s3.delete_object(bucket [, key])`
### `aws/s3_client.delete_object(bucket [, key])`

Deletes the object at the given key

### `aws/s3.list_objects(bucket [, prefix, continuation_token, delimiter])`
### `aws/s3_client.list_objects(bucket [, prefix, continuation_token, delimiter])`

Returns a table of results containing the following structure:

Expand Down Expand Up @@ -143,7 +154,7 @@ or:
}
```

### `aws/s3.delete_recursive(bucket, prefix)`
### `aws/s3_client.delete_recursive(bucket, prefix)`

Deletes all objects under the given prefix

Expand Down Expand Up @@ -196,6 +207,34 @@ The `table_input` is the same as the argument in `glue.create_table` function.

Delete an existing Table in Glue Catalog.

### `azure`

### `azure/blob_client`
Azure blob client library.

```lua
local azure = require("azure")
-- pass valid Azure credentials
local client = azure.blob_client("AZURE_STORAGE_ACCOUNT", "AZURE_ACCESS_KEY")
```

### `azure/blob_client.get_object(path_uri)`

Returns the body (as a Lua string) of the requested object and a boolean value that is true if the requested object exists
`path_uri` - A valid Azure blob storage uri in the form of `https://myaccount.blob.core.windows.net/mycontainer/myblob`

### `azure/blob_client.put_object(path_uri, value)`

Sets the object at the given bucket and key to the value of the supplied value string
`path_uri` - A valid Azure blob storage uri in the form of `https://myaccount.blob.core.windows.net/mycontainer/myblob`

### `azure/blob_client.delete_object(path_uri)`

Deletes the object at the given key
`path_uri` - A valid Azure blob storage uri in the form of `https://myaccount.blob.core.windows.net/mycontainer/myblob`

### `crypto`

### `crypto/aes/encryptCBC(key, plaintext)`

Returns a ciphertext for the aes encrypted text
Expand Down Expand Up @@ -412,20 +451,20 @@ Parameters:

A package used to export Delta Lake tables from lakeFS to an external cloud storage.

### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_names, writer, delta_client, table_descriptors_path)`
### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_def_names, write_object, delta_client, table_descriptors_path)`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a breaking change? I'm not very familiar with named params in lua

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a bug in the documentation - the code did not change


The function used to export Delta Lake tables.
The return value is a table with mapping of table names to external table location (from which it is possible to query the data).

Parameters:

- `action`: The global action object
- `table_names`: Delta tables name list (e.g. `{"table1", "table2"}`)
- `writer`: A writer function with `function(bucket, key, data)` signature, used to write the exported Delta Log (e.g. `aws/s3.s3_client.put_object`)
- `table_def_names`: Delta tables name list (e.g. `{"table1", "table2"}`)
- `write_object`: A writer function with `function(bucket, key, data)` signature, used to write the exported Delta Log (e.g. `aws/s3_client.put_object` or `azure/blob_client.put_object`)
- `delta_client`: A Delta Lake client that implements `get_table: function(repo, ref, prefix)`
- `table_descriptors_path`: The path under which the table descriptors of the provided `table_names` reside
- `table_descriptors_path`: The path under which the table descriptors of the provided `table_def_names` reside

Example:
Delta export example for AWS S3:

```yaml
---
Expand All @@ -444,7 +483,7 @@ hooks:
local table_descriptors_path = "_lakefs_tables"
local sc = aws.s3_client(args.aws.access_key_id, args.aws.secret_access_key, args.aws.region)
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, args.aws.region)
local delta_table_locations = delta_exporter.export_delta_log(action, args.table_names, sc.put_object, delta_client, table_descriptors_path)
local delta_table_locations = delta_exporter.export_delta_log(action, args.table_defs, sc.put_object, delta_client, table_descriptors_path)

for t, loc in pairs(delta_table_locations) do
print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n")
Expand All @@ -457,7 +496,7 @@ hooks:
lakefs:
access_key_id: <LAKEFS_ACCESS_KEY_ID>
secret_access_key: <LAKEFS_SECRET_ACCESS_KEY>
table_names:
table_defs:
- mytable
```

Expand All @@ -469,6 +508,45 @@ type: delta
path: a/path/to/my/delta/table
```

Delta export example for Azure Blob Storage:

```yaml
name: Delta Exporter
on:
post-commit:
branches: ["{{ .Branch }}*"]
hooks:
- id: delta_exporter
type: lua
properties:
script: |
local azure = require("azure")
local formats = require("formats")
local delta_exporter = require("lakefs/catalogexport/delta_exporter")

local table_descriptors_path = "_lakefs_tables"
local sc = azure.blob_client(args.azure.storage_account, args.azure.access_key)
local function write_object(_, key, buf)
return sc.put_object(key,buf)
end
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key)
local delta_table_locations = delta_exporter.export_delta_log(action, args.table_names, write_object, delta_client, table_descriptors_path)

for t, loc in pairs(delta_table_locations) do
print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n")
end
args:
azure:
storage_account: "{{ .AzureStorageAccount }}"
access_key: "{{ .AzureAccessKey }}"
lakefs: # provide credentials of a user that has access to the script and Delta Table
access_key_id: "{{ .LakeFSAccessKeyID }}"
secret_access_key: "{{ .LakeFSSecretAccessKey }}"
table_defs:
- mytable

```

### `lakefs/catalogexport/table_extractor`

Utility package to parse `_lakefs_tables/` descriptors.
Expand Down Expand Up @@ -606,59 +684,6 @@ Parameters:
- `descriptor(Table)`: Object from (e.g. _lakefs_tables/my_table.yaml).
- `action_info(Table)`: The global action object.

### `lakefs/catalogexport/delta_exporter`

A package used to export Delta Lake tables from lakeFS to an external cloud storage.

### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_paths, writer, delta_client, table_descriptors_path)`

The function used to export Delta Lake tables.
The return value is a table with mapping of table names to external table location (from which it is possible to query the data).

Parameters:

- `action`: The global action object
- `table_paths`: Paths list in lakeFS to Delta Tables (e.g. `{"path/to/table1", "path/to/table2"}`)
- `writer`: A writer function with `function(bucket, key, data)` signature, used to write the exported Delta Log (e.g. `aws/s3.s3_client.put_object`)
- `delta_client`: A Delta Lake client that implements `get_table: function(repo, ref, prefix)`
- `table_descriptors_path`: The path under which the table descriptors of the provided `table_paths` reside

Example:

```yaml
---
name: delta_exporter
on:
post-commit: null
hooks:
- id: delta
type: lua
properties:
script: |
local aws = require("aws")
local formats = require("formats")
local delta_exporter = require("lakefs/catalogexport/delta_exporter")

local table_descriptors_path = "_lakefs_tables"
local sc = aws.s3_client(args.aws.access_key_id, args.aws.secret_access_key, args.aws.region)
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, args.aws.region)
local delta_table_locations = delta_exporter.export_delta_log(action, args.table_paths, sc.put_object, delta_client, table_descriptors_path)

for t, loc in pairs(delta_table_locations) do
print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n")
end
args:
aws:
access_key_id: <AWS_ACCESS_KEY_ID>
secret_access_key: <AWS_SECRET_ACCESS_KEY>
region: us-east-1
lakefs:
access_key_id: <LAKEFS_ACCESS_KEY_ID>
secret_access_key: <LAKEFS_SECRET_ACCESS_KEY>
table_paths:
- my/delta/table/path
```

### `lakefs/catalogexport/unity_exporter`

A package used to register exported Delta Lake tables to Databricks' Unity catalog.
Expand Down
Loading
Loading