Skip to content

Commit a232e37

Browse files
committed
Stop deferring record batch read
It's not safe because users may not consume result.
1 parent 4941127 commit a232e37

File tree

2 files changed

+15
-39
lines changed

2 files changed

+15
-39
lines changed

lib/activerecord_adbc_adapter/database_statements.rb

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,36 +7,26 @@ def perform_query(raw_connection,
77
prepare:,
88
notification_payload:,
99
batch:)
10-
statement = raw_connection.open_statement
11-
begin
10+
raw_connection.open_statement do |statement|
1211
statement.sql_query = sql
1312
if binds.empty?
14-
reader = statement.execute do |r,|
15-
r
16-
end
13+
statement.execute[0]
1714
else
1815
statement.prepare
1916
raw_records = {}
2017
binds.zip(type_casted_binds) do |bind, type_casted_bind|
2118
raw_records[bind.name] = [type_casted_bind]
2219
end
2320
record_batch = Arrow::RecordBatch.new(raw_records)
24-
reader = statement.bind(record_batch) do
25-
statement.execute do |r,|
26-
r
27-
end
21+
statement.bind(record_batch) do
22+
statement.execute[0]
2823
end
2924
end
30-
rescue
31-
statement.release
32-
raise
33-
else
34-
Result.new(statement, reader)
3525
end
3626
end
3727

38-
def cast_result(result)
39-
result
28+
def cast_result(table)
29+
Result.new(table)
4030
end
4131

4232
# Borrowed from

lib/activerecord_adbc_adapter/result.rb

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@ module ActiveRecordADBCAdapter
22
class Result
33
include Enumerable
44

5-
def initialize(statement, record_batch_reader)
6-
@statement = statement
7-
@record_batch_reader = record_batch_reader
8-
@schema = @record_batch_reader.schema
5+
def initialize(table)
6+
@table = table
7+
@schema = @table.schema
98
end
109

1110
def columns
@@ -48,20 +47,17 @@ def indexed_rows
4847
end
4948

5049
def to_arrow
51-
@table ||= consume_record_batch_reader do
52-
@record_batch_reader.read_all
53-
end
50+
@table
5451
end
5552

5653
def each_record_batch
5754
return to_enum(__method__) unless block_given?
5855

59-
consume_record_batch_reader do
60-
loop do
61-
record_batch = @record_batch_reader.read_next
62-
break if record_batch.nil?
63-
yield(record_batch)
64-
end
56+
reader = Arrow::TableBatchReader.new(@table)
57+
loop do
58+
record_batch = reader.read_next
59+
break if record_batch.nil?
60+
yield(record_batch)
6561
end
6662
end
6763

@@ -70,16 +66,6 @@ def fields
7066
@fields ||= @schema.fields
7167
end
7268

73-
def consume_record_batch_reader
74-
begin
75-
yield
76-
ensure
77-
@record_batch_reader = nil
78-
@statement.release
79-
@statement = nil
80-
end
81-
end
82-
8369
def resolve_type(data_type)
8470
case data_type
8571
when Arrow::Int32DataType

0 commit comments

Comments
 (0)