Skip to content

Commit 1e7cf54

Browse files
authored
Support v2 "async" transactions returning multipart Arrow responses (#17)
* Support v2 "async" transactions returning multipart Arrow responses Part 1: WIP basic GET/PUT calls for the v2 API. * Parse the MultiPartForms + Arrow results into Arrow.Tables[] * Add tests for v2 async/sync responses * Update src/api.jl * Add support for v1_inputs to v2-style exec :) (#23) * Add support for v1_inputs to v2-style exec :) * Adjust to latest /problems endpoint implementation from frontend * Better comment + version patch on parse_multipart_form * clarify test * Rename exec to exec_async * Fixups: - support empty responses - export the new names - add an "is finished" function * Delete the struct ResultPhysicalRelation, and just return a pair * prefer haskey * Switch back to returning a Dict, since i think it's more normal. * Add examples/exec_async.jl: ```bash $ julia --proj examples/exec_async.jl "nhd-test-1" "nhd-s" "def x = 1 def x = x+1 def output = x" Transaction is created... JSON3.Object{Vector{UInt8}, Vector{UInt64}} with 2 entries: :id => "c616a8ae-3be2-0d51-fb0e-b7f862bc367d" :state => "CREATED" ``` * Fix bug in transaction_is_done after switching to dicts
1 parent f16e712 commit 1e7cf54

File tree

6 files changed

+308
-2
lines changed

6 files changed

+308
-2
lines changed

Project.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ version = "0.0.1"
44

55
[deps]
66
ArgParse = "c7e460c6-2fb9-53a9-8c5b-16f535851c63"
7+
Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45"
78
ConfParser = "88353bc9-fd38-507d-a820-d3b43837d6b9"
89
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
910
HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3"
1011
JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1"
12+
Mocking = "78c3b35d-d492-501b-9361-3d52fe80e533"
1113
Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
1214

1315
[compat]

examples/exec_async.jl

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Copyright 2022 RelationalAI, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License
14+
15+
# Execute the given query string.
16+
# Example:
17+
# $ julia --proj=. examples/exec_async.jl "nhd-test-1" "nhd-s" "2+2"
18+
# Transaction is done...
19+
# JSON3.Object{Vector{UInt8}, Vector{UInt64}} with 2 entries:
20+
# :id => "261f5d59-f1b4-f778-4c13-a7993871c972"
21+
# :state => "CREATED"
22+
23+
import RAI
24+
using RAI: Context, HTTPError, exec_async, load_config, show_result, get_transaction
25+
26+
include("parseargs.jl")
27+
28+
function run(database, engine, source; profile)
29+
conf = load_config(; profile = profile)
30+
ctx = Context(conf)
31+
txn = exec_async(ctx, database, engine, source)
32+
println("Transaction is created...")
33+
display(txn)
34+
println()
35+
end
36+
37+
function main()
38+
args = parseargs(
39+
"database", Dict(:help => "database name", :required => true),
40+
"engine", Dict(:help => "engine name", :required => true),
41+
"command", Dict(:help => "rel source string"),
42+
["--file", "-f"], Dict(:help => "rel source file"),
43+
"--readonly", Dict(:help => "readonly query (default: false)", :action => "store_true"),
44+
"--profile", Dict(:help => "config profile (default: default)"))
45+
try
46+
source = nothing
47+
if !isnothing(args.command)
48+
source = args.command
49+
elseif !isnothing(args.file)
50+
source = open(args.file, "r")
51+
end
52+
isnothing(source) && return # nothing to execute
53+
run(args.database, args.engine, source; profile = args.profile)
54+
catch e
55+
e isa HTTPError ? show(e) : rethrow()
56+
end
57+
end
58+
59+
main()

src/RAI.jl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,17 @@ export
6969

7070
export
7171
exec,
72+
exec_async,
7273
list_edbs,
7374
load_csv,
7475
load_json
7576

77+
export
78+
get_transaction,
79+
get_transaction_metadata,
80+
get_transaction_problems,
81+
get_transaction_results
82+
7683
export
7784
Relation,
7885
TransactionResult,

src/api.jl

Lines changed: 138 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,17 @@
1818
# functionality as direclty as possible, but in a way that is natural for the
1919
# Julia language.
2020

21+
#using Infiltrator
2122
import JSON3
23+
import Arrow
24+
25+
using Mocking: Mocking, @mock # For unit testing, by mocking API server responses
2226

2327
const PATH_DATABASE = "/database"
2428
const PATH_ENGINE = "/compute"
2529
const PATH_OAUTH_CLIENTS = "/oauth-clients"
2630
const PATH_TRANSACTION = "/transaction"
31+
const PATH_ASYNC_TRANSACTIONS = "/transactions"
2732
const PATH_USERS = "/users"
2833

2934
struct HTTPError <: Exception
@@ -57,7 +62,7 @@ function _mkurl(ctx::Context, path)
5762
end
5863

5964
function _print_request(method, path, query, body)
60-
println("$method $path")
65+
println("$method $path")
6166
!isnothing(query) && for (k, v) in query
6267
println("$k: $v")
6368
end
@@ -338,7 +343,7 @@ end
338343

339344
# Execute the given query string, using any given optioanl query inputs.
340345
# todo: consider create_transaction
341-
# todo: consider create_transaction to better align with future transaciton
346+
# todo: consider create_transaction to better align with future transaction
342347
# resource model
343348
function exec(ctx::Context, database::AbstractString, engine::AbstractString, source; inputs = nothing, readonly = false, kw...)
344349
source isa IO && (source = read(source, String))
@@ -349,6 +354,113 @@ end
349354
# todo: when we have async transactions, add a variation that dispatches and
350355
# waits .. consider creating two entry points for readonly and readwrite.
351356

357+
function exec_async(ctx::Context, database::AbstractString, engine::AbstractString, source; inputs = nothing, readonly = false, kw...)
358+
source isa IO && (source = read(source, String))
359+
tx_body = Dict(
360+
"dbname" => database,
361+
"engine_name" => engine,
362+
"query" => source,
363+
#"nowait_durable" => self.nowait_durable, # TODO: currently unsupported
364+
"readonly" => readonly,
365+
# "sync_mode" => "async"
366+
)
367+
if inputs !== nothing
368+
tx_body["v1_inputs"] = [_make_query_action_input(k, v) for (k, v) in inputs]
369+
end
370+
body = JSON3.write(tx_body)
371+
path = _mkurl(ctx, PATH_ASYNC_TRANSACTIONS)
372+
rsp = @mock request(ctx, "POST", path; body = body, kw...)
373+
return _parse_response(rsp)
374+
end
375+
376+
function _parse_response(rsp)
377+
content_type = HTTP.header(rsp, "Content-Type")
378+
if lowercase(content_type) == "application/json"
379+
content = HTTP.body(rsp)
380+
# async mode
381+
return JSON3.read(content)
382+
elseif occursin("multipart/form-data", lowercase(content_type))
383+
# sync mode
384+
return _parse_multipart_fastpath_sync_response(rsp)
385+
else
386+
error("Unknown response content-type, for response:\n$(rsp)")
387+
end
388+
end
389+
390+
function get_transaction(ctx::Context, id::AbstractString; kw...)
391+
path = PATH_ASYNC_TRANSACTIONS * "/$id"
392+
rsp = _get(ctx, path; kw...)
393+
return rsp.transaction
394+
end
395+
396+
function transaction_is_done(txn)
397+
if haskey(txn, "transaction")
398+
txn = txn["transaction"]
399+
end
400+
return txn["state"] ("COMPLETED", "ABORTED")
401+
end
402+
403+
function get_transaction_metadata(ctx::Context, id::AbstractString; kw...)
404+
path = PATH_ASYNC_TRANSACTIONS * "/$id/metadata"
405+
rsp = _get(ctx, path; kw...)
406+
return rsp
407+
end
408+
409+
function get_transaction_problems(ctx::Context, id::AbstractString; kw...)
410+
path = PATH_ASYNC_TRANSACTIONS * "/$id/problems"
411+
rsp = _get(ctx, path; kw...)
412+
return rsp
413+
end
414+
415+
function get_transaction_results(ctx::Context, id::AbstractString; kw...)
416+
path = PATH_ASYNC_TRANSACTIONS * "/$id/results"
417+
path = _mkurl(ctx, path)
418+
rsp = request(ctx, "GET", path; kw...)
419+
content_type = HTTP.header(rsp, "Content-Type")
420+
if !occursin("multipart/form-data", content_type)
421+
throw(HTTPError(400, "Unexpected response content-type for rsp:\n$rsp"))
422+
end
423+
return _parse_multipart_results_response(rsp)
424+
end
425+
426+
function _parse_multipart_fastpath_sync_response(msg)
427+
# TODO: in-place conversion to Arrow without copying the bytes.
428+
# ... HTTP.parse_multipart_form() copies the bytes into IOBuffers.
429+
parts = _parse_multipart_form(msg)
430+
@assert parts[1].name == "transaction"
431+
@assert parts[2].name == "metadata"
432+
433+
problems_idx = findfirst(p->p.name == "problems", parts)
434+
problems = JSON3.read(parts[problems_idx])
435+
436+
results_start_idx = findfirst(p->startswith(p.name, '/'), parts)
437+
if results_start_idx === nothing
438+
results = []
439+
else
440+
results = _extract_multipart_results_response(@view(parts[results_start_idx:end]))
441+
end
442+
443+
return Dict(
444+
"transaction" => JSON3.read(parts[1]),
445+
"metadata" => JSON3.read(parts[2]),
446+
"problems" => problems,
447+
"results" => results,
448+
)
449+
end
450+
451+
function _parse_multipart_results_response(msg)
452+
# TODO: in-place conversion to Arrow without copying the bytes.
453+
# ... HTTP.parse_multipart_form() copies the bytes into IOBuffers.
454+
parts = _parse_multipart_form(msg)
455+
return _extract_multipart_results_response(parts)
456+
end
457+
function _extract_multipart_results_response(parts)
458+
return [
459+
(part.name => Arrow.Table(part.data)) for part in parts
460+
]
461+
end
462+
463+
352464
function list_edbs(ctx::Context, database::AbstractString, engine::AbstractString; kw...)
353465
tx = Transaction(ctx.region, database, engine, "OPEN"; readonly = true)
354466
data = body(tx, _make_list_edb_action())
@@ -444,3 +556,27 @@ function load_model(ctx::Context, database::AbstractString, engine::AbstractStri
444556
actions = [_make_load_model_action(name, model) for (name, model) in models]
445557
return _post(ctx, PATH_TRANSACTION; query = query(tx), body = body(tx, actions...), kw...)
446558
end
559+
560+
561+
562+
# --- utils -------------------------
563+
# Patch for older versions of HTTP package that don't support parsing multipart responses:
564+
if hasmethod(HTTP.MultiPartParsing.parse_multipart_form, (HTTP.Response,))
565+
# Available as of HTTP v0.9.18:
566+
_parse_multipart_form = HTTP.MultiPartParsing.parse_multipart_form
567+
else
568+
# This function is copied directly from this PR: https://github.com/JuliaWeb/HTTP.jl/pull/817
569+
function _parse_multipart_form(msg::HTTP.Message)
570+
# parse boundary from Content-Type
571+
m = match(r"multipart/form-data; boundary=(.*)$", msg["Content-Type"])
572+
m === nothing && return nothing
573+
574+
boundary_delimiter = m[1]
575+
576+
# [RFC2046 5.1.1](https://tools.ietf.org/html/rfc2046#section-5.1.1)
577+
length(boundary_delimiter) > 70 && error("boundary delimiter must not be greater than 70 characters")
578+
579+
return HTTP.MultiPartParsing.parse_multipart_body(HTTP.payload(msg), boundary_delimiter)
580+
end
581+
end
582+
# -----------------------------------

test/api.jl

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
using RAI
2+
using Test
3+
import HTTP, Arrow
4+
using Mocking
5+
6+
Mocking.activate()
7+
8+
# -----------------------------------
9+
# v2 transactions
10+
11+
make_patch(response) = @patch RAI.request(ctx::Context, args...; kw...) = response
12+
13+
const v2_async_response = HTTP.Response(200, [
14+
"Content-Type" => "application/json",
15+
],
16+
body = """{"id":"1fc9001b-1b88-8685-452e-c01bc6812429","state":"CREATED"}""")
17+
18+
const v2_get_results_response() = join([
19+
"--8a89e52be8efe57f0b68ea75388314a3",
20+
"Content-Disposition: form-data; name=\"/:output/Int64\"; filename=\"/:output/Int64\"",
21+
"Content-Type: application/vnd.apache.arrow.stream",
22+
"",
23+
"\xff\xff\xff\xffx\0\0\0\x10\0\0\0\0\0\n\0\f\0\n\0\b\0\x04\0\n\0\0\0\x10\0\0\0\x01\0\x04\0\b\0\b\0\0\0\x04\0\b\0\0\0\x04\0\0\0\x01\0\0\0\x14\0\0\0\x10\0\x14\0\x10\0\0\0\x0e\0\b\0\0\0\x04\0\x10\0\0\0\x10\0\0\0\x18\0\0\0\0\0\x02\0\x1c\0\0\0\0\0\0\0\b\0\f\0\b\0\a\0\b\0\0\0\0\0\0\x01@\0\0\0\x02\0\0\0v1\0\0\xff\xff\xff\xff\x88\0\0\0\x14\0\0\0\0\0\0\0\f\0\x16\0\x14\0\x12\0\f\0\x04\0\f\0\0\0\b\0\0\0\0\0\0\0\x14\0\0\0\0\0\x03\0\x04\0\n\0\x18\0\f\0\b\0\x04\0\n\0\0\0\x14\0\0\08\0\0\0\x01\0\0\0\0\0\0\0\0\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\b\0\0\0\0\0\0\0\0\0\0\0\x01\0\0\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x04\0\0\0\0\0\0\0\xff\xff\xff\xff\0\0\0\0",
24+
], "\r\n")
25+
26+
const v2_fastpath_response = HTTP.Response(200, [
27+
"Content-Type" => "Content-Type: multipart/form-data; boundary=8a89e52be8efe57f0b68ea75388314a3",
28+
"Transfer-Encoding" => "chunked",
29+
],
30+
body = join([
31+
"",
32+
"--8a89e52be8efe57f0b68ea75388314a3",
33+
"Content-Disposition: form-data; name=\"transaction\"; filename=\"\"",
34+
"Content-Type: application/json",
35+
"",
36+
"""{"id":"a3e3bc91-0a98-50ba-733c-0987e160eb7d","results_format_version":"2.0.1","state":"COMPLETED"}""",
37+
"--8a89e52be8efe57f0b68ea75388314a3",
38+
"Content-Disposition: form-data; name=\"metadata\"; filename=\"\"",
39+
"Content-Type: application/json",
40+
"",
41+
"""[{"relationId":"/:output/Int64","types":[":output","Int64"]}]""",
42+
"--8a89e52be8efe57f0b68ea75388314a3",
43+
"Content-Disposition: form-data; name=\"problems\"; filename=\"\"",
44+
"Content-Type: application/json",
45+
"",
46+
"""[]""",
47+
v2_get_results_response(),
48+
"--8a89e52be8efe57f0b68ea75388314a3--",
49+
"",
50+
], "\r\n"))
51+
52+
function make_arrow_table(vals)
53+
io = IOBuffer()
54+
Arrow.write(io, (v1=vals,))
55+
seekstart(io)
56+
return Arrow.Table(io)
57+
end
58+
59+
@testset "exec_async" begin
60+
ctx = Context("region", "scheme", "host", "2342", nothing)
61+
62+
@testset "async response" begin
63+
patch = make_patch(v2_async_response)
64+
65+
apply(patch) do
66+
rsp = RAI.exec_async(ctx, "engine", "database", "2+2")
67+
@test rsp == JSON3.read("""{"id":"1fc9001b-1b88-8685-452e-c01bc6812429","state":"CREATED"}""")
68+
end
69+
end
70+
71+
@testset "sync response" begin
72+
patch = make_patch(v2_fastpath_response)
73+
74+
apply(patch) do
75+
rsp = RAI.exec_async(ctx, "engine", "database", "2+2")
76+
@test rsp["transaction"] == JSON3.read("""{
77+
"id": "a3e3bc91-0a98-50ba-733c-0987e160eb7d",
78+
"results_format_version": "2.0.1",
79+
"state": "COMPLETED"
80+
}""")
81+
@test rsp["metadata"] == [JSON3.read("""{
82+
"relationId": "/:output/Int64",
83+
"types": [
84+
":output",
85+
"Int64"
86+
]
87+
}""")]
88+
@test rsp["problems"] == Union{}[]
89+
90+
# Test for the expected arrow data:
91+
expected_data = make_arrow_table([4])
92+
# Arrow.Tables can't be compared via == (https://github.com/apache/arrow-julia/issues/310)
93+
@test length(rsp["results"]) == 1
94+
@test rsp["results"][1][1] == "/:output/Int64"
95+
@test collect(rsp["results"][1][2]) == collect(expected_data)
96+
end
97+
end
98+
end

test/runtests.jl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ import Tables
1818
using RAI
1919
using Test
2020

21+
@testset "api.jl" begin
22+
include("api.jl")
23+
end
24+
2125
# def output =
2226
# 1, "foo", 3.4, :foo;
2327
# 2, "bar", 5.6, :foo

0 commit comments

Comments
 (0)