Skip to content

Commit

Permalink
Fix: Delta export path unescape (#7473)
Browse files Browse the repository at this point in the history
* Fix: Delta export path unescape

* CR Fixes
  • Loading branch information
N-o-Z authored Feb 19, 2024
1 parent 8205736 commit 70f932c
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 5 deletions.
108 changes: 106 additions & 2 deletions esti/catalog_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,15 @@ func validateExportTestByStorageType(t *testing.T, ctx context.Context, commit s
namespaceURL, err := url.Parse(resp.JSON200.StorageNamespace)
require.NoError(t, err)
keyTempl := "%s/_lakefs/exported/%s/%s/test_table/_delta_log/00000000000000000000.json"
tableStat, err := client.StatObjectWithResponse(ctx, testData.Repository, mainBranch, &apigen.StatObjectParams{
Path: "tables/test-table/test partition/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet",
})
require.NoError(t, err)
require.NotNil(t, tableStat.JSON200)
expectedPath, err := url.Parse(tableStat.JSON200.PhysicalAddress)
require.NoError(t, err)

var reader io.ReadCloser
switch blockstoreType {
case block.BlockstoreTypeS3:
cfg, err := config.LoadDefaultConfig(ctx,
Expand All @@ -434,10 +442,11 @@ func validateExportTestByStorageType(t *testing.T, ctx context.Context, commit s

clt := s3.NewFromConfig(cfg)
key := fmt.Sprintf(keyTempl, strings.TrimPrefix(namespaceURL.Path, "/"), mainBranch, commit[:6])
_, err = clt.HeadObject(ctx, &s3.HeadObjectInput{
readResp, err := clt.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(namespaceURL.Host),
Key: aws.String(key)})
require.NoError(t, err)
reader = readResp.Body

case block.BlockstoreTypeAzure:
azClient, err := azure.BuildAzureServiceClient(params.Azure{
Expand All @@ -448,12 +457,21 @@ func validateExportTestByStorageType(t *testing.T, ctx context.Context, commit s

containerName, prefix, _ := strings.Cut(namespaceURL.Path, uri.PathSeparator)
key := fmt.Sprintf(keyTempl, strings.TrimPrefix(prefix, "/"), mainBranch, commit[:6])
_, err = azClient.NewContainerClient(containerName).NewBlobClient(key).GetProperties(ctx, nil)
readResp, err := azClient.NewContainerClient(containerName).NewBlockBlobClient(key).DownloadStream(ctx, nil)
require.NoError(t, err)
reader = readResp.Body

default:
t.Fatal("validation failed on unsupported block adapter")
}

defer func() {
err := reader.Close()
require.NoError(t, err)
}()
contents, err := io.ReadAll(reader)
require.NoError(t, err)
require.Contains(t, string(contents), expectedPath.String())
}

func TestDeltaCatalogExport(t *testing.T) {
Expand Down Expand Up @@ -509,3 +527,89 @@ func TestDeltaCatalogExport(t *testing.T) {
require.Equal(t, "delta_exporter", tasks.JSON200.Results[0].HookId)
validateExportTestByStorageType(t, ctx, headCommit.Id, testData, blockstore)
}

func TestDeltaCatalogImportExport(t *testing.T) {
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

requireBlockstoreType(t, block.BlockstoreTypeS3)
accessKeyID := viper.GetString("access_key_id")
secretAccessKey := viper.GetString("secret_access_key")
testData := &exportHooksTestData{
Repository: repo,
Branch: mainBranch,
LakeFSAccessKeyID: accessKeyID,
LakeFSSecretAccessKey: secretAccessKey,
}
blockstore := setupCatalogExportTestByStorageType(t, testData)
tmplDir, err := fs.Sub(exportHooksFiles, "export_hooks_files/delta")
require.NoError(t, err)
err = fs.WalkDir(tmplDir, "data", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
buf, err := fs.ReadFile(tmplDir, path)
if err != nil {
return err
}
uploadToPhysicalAddress(t, ctx, repo, mainBranch, strings.TrimPrefix(path, "data/"), string(buf))
}
return nil
})
require.NoError(t, err)

headCommit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{
"_lakefs_actions/delta_export.yaml": renderTplFileAsStr(t, testData, tmplDir, fmt.Sprintf("%s/_lakefs_actions/delta_export.yaml", blockstore)),
})

runs := waitForListRepositoryRunsLen(ctx, t, repo, headCommit.Id, 1)
run := runs.Results[0]
require.Equal(t, "completed", run.Status)

amount := apigen.PaginationAmount(1)
tasks, err := client.ListRunHooksWithResponse(ctx, repo, run.RunId, &apigen.ListRunHooksParams{
Amount: &amount,
})
require.NoError(t, err)
require.NotNil(t, tasks.JSON200)
require.Equal(t, 1, len(tasks.JSON200.Results))
require.Equal(t, "delta_exporter", tasks.JSON200.Results[0].HookId)
validateExportTestByStorageType(t, ctx, headCommit.Id, testData, blockstore)
}

func uploadToPhysicalAddress(t *testing.T, ctx context.Context, repo, branch, objPath, objContent string) {
t.Helper()
physicalAddress, err := url.Parse(getStorageNamespace(t, ctx, repo))
require.NoError(t, err)
physicalAddress = physicalAddress.JoinPath("data", objPath)

adapter, err := NewAdapter(physicalAddress.Scheme)
require.NoError(t, err)

stats, err := adapter.Upload(ctx, physicalAddress, strings.NewReader(objContent))
require.NoError(t, err)

mtime := stats.MTime.Unix()
unescapedAddress, err := url.PathUnescape(physicalAddress.String()) // catch: https://github.com/treeverse/lakeFS/issues/7460
require.NoError(t, err)

resp, err := client.StageObjectWithResponse(ctx, repo, branch, &apigen.StageObjectParams{
Path: objPath,
}, apigen.StageObjectJSONRequestBody{
Checksum: stats.ETag,
Mtime: &mtime,
PhysicalAddress: unescapedAddress,
SizeBytes: stats.Size,
})
require.NoError(t, err)
require.NotNil(t, resp.JSON201)
}

func getStorageNamespace(t *testing.T, ctx context.Context, repo string) string {
t.Helper()
resp, err := client.GetRepositoryWithResponse(ctx, repo)
require.NoError(t, err)
require.NotNil(t, resp.JSON200)
return resp.JSON200.StorageNamespace
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"db5e0917-1716-4b0f-a009-c25e5b7304a1","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"registration_dttm\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"first_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"last_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"email\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"gender\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ip_address\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"cc\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"country\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthdate\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"salary\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"title\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"comments\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"__index_level_0__\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1707066829815,"configuration":{}}}
{"add":{"path":"0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet","partitionValues":{},"size":6434,"modificationTime":1707066829810,"dataChange":true,"stats":"{\"numRecords\": 11, \"minValues\": {\"registration_dttm\": \"2016-02-03T00:35:29\", \"id\": 137, \"first_name\": \"Alan\", \"last_name\": \"Cox\", \"email\": \"[email protected]\", \"gender\": \"Female\", \"ip_address\": \"115.51.190.173\", \"cc\": \"\", \"country\": \"Brazil\", \"birthdate\": \"\", \"salary\": 32792.52, \"title\": \"\", \"comments\": \"\", \"__index_level_0__\": 136}, \"maxValues\": {\"registration_dttm\": \"2016-02-03T23:47:37\", \"id\": 860, \"first_name\": \"Sharon\", \"last_name\": \"Vasquez\", \"email\": \"[email protected]\", \"gender\": \"Male\", \"ip_address\": \"46.161.44.180\", \"cc\": \"630472604816980983\", \"country\": \"Philippines\", \"birthdate\": \"9/27/1984\", \"salary\": 278553.57, \"title\": \"Systems Administrator I\", \"comments\": \"\", \"__index_level_0__\": 859}, \"nullCount\": {\"registration_dttm\": 0, \"id\": 0, \"first_name\": 0, \"last_name\": 0, \"email\": 0, \"gender\": 0, \"ip_address\": 0, \"cc\": 0, \"country\": 0, \"birthdate\": 0, \"salary\": 0, \"title\": 0, \"comments\": 0, \"__index_level_0__\": 0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"add":{"path":"test%20partition/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet","partitionValues":{},"size":6434,"modificationTime":1707066829810,"dataChange":true,"stats":"{\"numRecords\": 11, \"minValues\": {\"registration_dttm\": \"2016-02-03T00:35:29\", \"id\": 137, \"first_name\": \"Alan\", \"last_name\": \"Cox\", \"email\": \"[email protected]\", \"gender\": \"Female\", \"ip_address\": \"115.51.190.173\", \"cc\": \"\", \"country\": \"Brazil\", \"birthdate\": \"\", \"salary\": 32792.52, \"title\": \"\", \"comments\": \"\", \"__index_level_0__\": 136}, \"maxValues\": {\"registration_dttm\": \"2016-02-03T23:47:37\", \"id\": 860, \"first_name\": \"Sharon\", \"last_name\": \"Vasquez\", \"email\": \"[email protected]\", \"gender\": \"Male\", \"ip_address\": \"46.161.44.180\", \"cc\": \"630472604816980983\", \"country\": \"Philippines\", \"birthdate\": \"9/27/1984\", \"salary\": 278553.57, \"title\": \"Systems Administrator I\", \"comments\": \"\", \"__index_level_0__\": 859}, \"nullCount\": {\"registration_dttm\": 0, \"id\": 0, \"first_name\": 0, \"last_name\": 0, \"email\": 0, \"gender\": 0, \"ip_address\": 0, \"cc\": 0, \"country\": 0, \"birthdate\": 0, \"salary\": 0, \"title\": 0, \"comments\": 0, \"__index_level_0__\": 0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1707066829820,"operation":"CREATE TABLE","operationParameters":{"protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":2}","metadata":"{\"configuration\":{},\"created_time\":1707066829815,\"description\":null,\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"db5e0917-1716-4b0f-a009-c25e5b7304a1\",\"name\":null,\"partition_columns\":[],\"schema\":{\"fields\":[{\"metadata\":{},\"name\":\"registration_dttm\",\"nullable\":true,\"type\":\"timestamp\"},{\"metadata\":{},\"name\":\"id\",\"nullable\":true,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"first_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"last_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"email\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"gender\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"ip_address\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"cc\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"country\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"birthdate\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"salary\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"title\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"comments\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"__index_level_0__\",\"nullable\":true,\"type\":\"long\"}],\"type\":\"struct\"}}","mode":"ErrorIfExists","location":"s3a://delta-lake-demo/main/data"},"clientVersion":"delta-rs.0.17.0"}}
8 changes: 7 additions & 1 deletion pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,13 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli
local code, obj = lakefs.stat_object(repo, commit_id, unescaped_path)
if code == 200 then
local obj_stat = json.unmarshal(obj)
local physical_path = obj_stat["physical_address"]
--[[
This code block handles escaping of the physical address path part
Since we don't want to escape the entire URL (i.e. schema, host), we parse the url and rebuild it.
Building the url will then handle any escaping needed on the relevant parts.
]]
local u = url.parse(obj_stat["physical_address"])
local physical_path = url.build_url(u["scheme"], u["host"], u["path"])
if entry.add ~= nil then
entry.add.path = physical_path
elseif entry.remove ~= nil then
Expand Down
2 changes: 1 addition & 1 deletion pkg/actions/lua/lakefs/catalogexport/table_extractor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ end
local function get_table_descriptor(client, repo_id, commit_id, logical_path)
local code, content = client.get_object(repo_id, commit_id, logical_path)
if code ~= 200 then
error("could not fetch data file: HTTP " .. tostring(code) , "path: ", logical_path)
error("could not fetch data file: HTTP " .. tostring(code) .. " path: " .. logical_path)
end
local descriptor = yaml.unmarshal(content)
descriptor.partition_columns = descriptor.partition_columns or {}
Expand Down
29 changes: 29 additions & 0 deletions pkg/actions/lua/net/url/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ func Open(l *lua.State) {
var library = []lua.RegistryFunction{
{Name: "parse", Function: parse},
{Name: "query_unescape", Function: queryUnescape},
{Name: "path_escape", Function: pathEscape},
{Name: "build_url", Function: build},
}

func parse(l *lua.State) int {
Expand Down Expand Up @@ -47,3 +49,30 @@ func queryUnescape(l *lua.State) int {
l.PushString(qu)
return 1
}

func pathEscape(l *lua.State) int {
path := lua.CheckString(l, 1)
ep := neturl.PathEscape(path)
l.PushString(ep)
return 1
}

func build(l *lua.State) int {
scheme := lua.CheckString(l, 1)
host := lua.CheckString(l, 2)
u := neturl.URL{
Scheme: scheme,
Host: host,
}
if !l.IsNone(3) {
u.Path = lua.CheckString(l, 3)
}
if !l.IsNone(4) {
u.RawQuery = lua.CheckString(l, 4)
}
if !l.IsNone(5) {
u.Fragment = lua.CheckString(l, 3)
}
l.PushString(u.String())
return 1
}

0 comments on commit 70f932c

Please sign in to comment.