Skip to content

Commit

Permalink
Merge branch 'fix-45394' of https://github.com/JOBIN-SABU/arrow into …
Browse files Browse the repository at this point in the history
…fix-45394
  • Loading branch information
JOBIN-SABU committed Feb 12, 2025
2 parents bafd6b7 + 2c0ca04 commit 8acf213
Show file tree
Hide file tree
Showing 19 changed files with 477 additions and 177 deletions.
2 changes: 2 additions & 0 deletions c_glib/test/test-memory-pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class TestMemoryPool < Test::Unit::TestCase

def setup
@memory_pool = Arrow::MemoryPool.default
# Our tests assume that some memory is allocated.
@buffer = Arrow::ResizableBuffer.new(1)
end

def test_bytes_allocated
Expand Down
9 changes: 7 additions & 2 deletions cpp/cmake_modules/ThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -1256,14 +1256,19 @@ endif()
# - Gandiva has a compile-time (header-only) dependency on Boost, not runtime.
# - Tests need Boost at runtime.
# - S3FS and Flight benchmarks need Boost at runtime.
# - arrow_testing uses boost::filesystem. So arrow_testing requires
# Boost library. (boost::filesystem isn't header-only.) But if we
# use arrow_testing as a static library without
# using arrow::util::Process, we don't need boost::filesystem.
if(ARROW_BUILD_INTEGRATION
OR ARROW_BUILD_TESTS
OR ARROW_FUZZING
OR (ARROW_FLIGHT AND (ARROW_TESTING OR ARROW_BUILD_BENCHMARKS))
OR (ARROW_S3 AND ARROW_BUILD_BENCHMARKS))
OR (ARROW_S3 AND ARROW_BUILD_BENCHMARKS)
OR (ARROW_TESTING AND ARROW_BUILD_SHARED))
set(ARROW_USE_BOOST TRUE)
set(ARROW_BOOST_REQUIRE_LIBRARY TRUE)
elseif(ARROW_GANDIVA
OR ARROW_TESTING
OR ARROW_WITH_THRIFT
OR (NOT ARROW_USE_NATIVE_INT128))
set(ARROW_USE_BOOST TRUE)
Expand Down
52 changes: 33 additions & 19 deletions cpp/src/arrow/dataset/file_json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ class JsonFragmentScanner : public FragmentScanner {
return Status::Invalid("Number of batches calculation overflowed");
}

auto future = json::StreamingReader::MakeAsync(

auto future = json::(
inspected.stream, format_options.read_options, parse_options,
io::default_io_context(), cpu_executor);

Expand All @@ -131,24 +132,20 @@ return future.Then([num_batches, block_size](const ReaderPtr& reader)
}

// Check if the input stream has only one JSON object and no newline
// Check if the input stream has only one JSON object and no newline
auto stream_data = inspected.stream->Read();
if (!stream_data.ok()) {
// Handle the error appropriately
return Status::IOError("Failed to read from the input stream");
}

std::string json_content = stream_data->ToString();
if (!json_content.empty() && json_content.back() != '\n') {
json_content += '\n'; // Append a newline to fix the issue
}

// Create a new InputStream with fixed content
inspected.stream = std::make_shared<io::BufferReader>(Buffer::FromString(json_content));

return std::make_shared<JsonFragmentScanner>(reader, num_batches, block_size);
});
auto stream_data = inspected.stream->Read();
if (stream_data.ok()) {
std::string json_content = stream_data->ToString();
if (!json_content.empty() && json_content.back() != '\n') {
json_content += '\n'; // Append a newline to fix the issue
}
// Create a new InputStream with fixed content
inspected.stream = std::make_shared<io::BufferReader>(Buffer::FromString(json_content));
}

return std::make_shared<JsonFragmentScanner>(reader, num_batches, block_size);
// Check if the input stream has only one JSON object and no newline
}

private:
Expand Down Expand Up @@ -329,8 +326,25 @@ Result<Future<ReaderPtr>> DoOpenReader(
json::UnexpectedFieldBehavior::Ignore;
}
return json::StreamingReader::MakeAsync(
std::move(state->stream), state->read_options, state->parse_options);
});
std::move(state->stream), state->read_options, state->parse_options)
.Then([](const ReaderPtr& reader) -> Result<ReaderPtr> {
auto stream_data = reader->stream->Read();
if (!stream_data.ok()) {
return Status::IOError("Failed to read from input stream");
}

std::string json_content = stream_data->ToString();
if (!json_content.empty() && json_content.back() != '\n') {
json_content += '\n'; // Append a newline to fix the issue
}

// Create a new InputStream with fixed content
reader->stream = std::make_shared<io::BufferReader>(Buffer::FromString(json_content));

return reader;
}, [path = source.path()](const Status& error) -> Result<ReaderPtr> {
return error.WithMessage("Could not open JSON input source '", path, "': ", error);
});
ARROW_ASSIGN_OR_RAISE(auto future, maybe_future);
return future.Then([](const ReaderPtr& reader) -> Result<ReaderPtr> { return reader; },
[path = source.path()](const Status& error) -> Result<ReaderPtr> {
Expand Down
1 change: 1 addition & 0 deletions docs/source/python/api/formats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ JSON Files

ReadOptions
ParseOptions
open_json
read_json

.. _api.parquet:
Expand Down
12 changes: 12 additions & 0 deletions docs/source/python/json.rst
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,15 @@ and pass it to :func:`read_json`. For example, you can pass an explicit

Similarly, you can choose performance settings by passing a
:class:`ReadOptions` instance to :func:`read_json`.


Incremental reading
-------------------

For memory-constrained environments, it is also possible to read a JSON file
one batch at a time, using :func:`open_json`.

In this case, type inference is done on the first block and types are frozen afterwards.
To make sure the right data types are inferred, either set
:attr:`ReadOptions.block_size` to a large enough value, or use
:attr:`ParseOptions.explicit_schema` to set the desired data types explicitly.
7 changes: 0 additions & 7 deletions python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,7 @@ def parse_git(root, **kwargs):
except ImportError:
__version__ = None

# ARROW-8684: Disable GC while initializing Cython extension module,
# to workaround Cython bug in https://github.com/cython/cython/issues/3603
_gc_enabled = _gc.isenabled()
_gc.disable()
import pyarrow.lib as _lib
if _gc_enabled:
_gc.enable()

from pyarrow.lib import (BuildInfo, RuntimeInfo, set_timezone_db_path,
MonthDayNano, VersionInfo, cpp_build_info,
cpp_version, cpp_version_info, runtime_info,
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/_csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1295,7 +1295,7 @@ def open_csv(input_file, read_options=None, parse_options=None,
Options for converting CSV data
(see pyarrow.csv.ConvertOptions constructor for defaults)
memory_pool : MemoryPool, optional
Pool to allocate Table memory from
Pool to allocate RecordBatch memory from
Returns
-------
Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/_flight.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2460,8 +2460,8 @@ cdef dict convert_headers(const CCallHeaders& c_headers):
CCallHeaders.const_iterator header_iter = c_headers.cbegin()
headers = {}
while header_iter != c_headers.cend():
header = c_string(deref(header_iter).first).decode("ascii")
value = c_string(deref(header_iter).second)
header = to_string(deref(header_iter).first).decode("ascii")
value = to_string(deref(header_iter).second)
if not header.endswith("-bin"):
# Text header values in gRPC (and HTTP/1, HTTP/2) are
# required to be valid ASCII. Binary header values are
Expand Down
78 changes: 77 additions & 1 deletion python/pyarrow/_json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport (_Weakrefable, MemoryPool,

from pyarrow.lib cimport (_Weakrefable, Schema,
RecordBatchReader, MemoryPool,
maybe_unbox_memory_pool,
get_input_stream, pyarrow_wrap_table,
pyarrow_wrap_schema, pyarrow_unwrap_schema)
Expand Down Expand Up @@ -266,6 +268,38 @@ cdef _get_parse_options(ParseOptions parse_options, CJSONParseOptions* out):
out[0] = parse_options.options


cdef class JSONStreamingReader(RecordBatchReader):
"""An object that reads record batches incrementally from a JSON file.
Should not be instantiated directly by user code.
"""
cdef readonly:
Schema schema

def __init__(self):
raise TypeError(f"Do not call {self.__class__.__name__}'s "
"constructor directly, "
"use pyarrow.json.open_json() instead.")

cdef _open(self, shared_ptr[CInputStream] stream,
CJSONReadOptions c_read_options,
CJSONParseOptions c_parse_options,
MemoryPool memory_pool):
cdef:
shared_ptr[CSchema] c_schema
CIOContext io_context

io_context = CIOContext(maybe_unbox_memory_pool(memory_pool))

with nogil:
self.reader = <shared_ptr[CRecordBatchReader]> GetResultValue(
CJSONStreamingReader.Make(stream, move(c_read_options),
move(c_parse_options), io_context))
c_schema = self.reader.get().schema()

self.schema = pyarrow_wrap_schema(c_schema)


def read_json(input_file, read_options=None, parse_options=None,
MemoryPool memory_pool=None):
"""
Expand Down Expand Up @@ -308,3 +342,45 @@ def read_json(input_file, read_options=None, parse_options=None,
table = GetResultValue(reader.get().Read())

return pyarrow_wrap_table(table)


def open_json(input_file, read_options=None, parse_options=None,
MemoryPool memory_pool=None):
"""
Open a streaming reader of JSON data.
Reading using this function is always single-threaded.
Parameters
----------
input_file : string, path or file-like object
The location of JSON data. If a string or path, and if it ends
with a recognized compressed file extension (e.g. ".gz" or ".bz2"),
the data is automatically decompressed when reading.
read_options : pyarrow.json.ReadOptions, optional
Options for the JSON reader (see pyarrow.json.ReadOptions constructor
for defaults)
parse_options : pyarrow.json.ParseOptions, optional
Options for the JSON parser
(see pyarrow.json.ParseOptions constructor for defaults)
memory_pool : MemoryPool, optional
Pool to allocate RecordBatch memory from
Returns
-------
:class:`pyarrow.json.JSONStreamingReader`
"""
cdef:
shared_ptr[CInputStream] stream
CJSONReadOptions c_read_options
CJSONParseOptions c_parse_options
JSONStreamingReader reader

_get_reader(input_file, &stream)
_get_read_options(read_options, &c_read_options)
_get_parse_options(parse_options, &c_parse_options)

reader = JSONStreamingReader.__new__(JSONStreamingReader)
reader._open(stream, move(c_read_options), move(c_parse_options),
memory_pool)
return reader
2 changes: 1 addition & 1 deletion python/pyarrow/_parquet_encryption.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from datetime import timedelta

from cython.operator cimport dereference as deref
from libcpp.memory cimport shared_ptr

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport _Weakrefable
Expand Down
74 changes: 24 additions & 50 deletions python/pyarrow/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
# distutils: language = c++

from libc.stdint cimport *

from libcpp cimport bool as c_bool, nullptr
from libcpp.functional cimport function
from libcpp.memory cimport shared_ptr, unique_ptr, make_shared
from libcpp.memory cimport (shared_ptr, unique_ptr, make_shared,
static_pointer_cast, dynamic_pointer_cast)
from libcpp.optional cimport nullopt, optional
from libcpp.string cimport string as c_string
from libcpp.utility cimport pair
from libcpp.utility cimport move, pair
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map
from libcpp.unordered_set cimport unordered_set
Expand All @@ -32,54 +35,27 @@ from cpython.datetime cimport PyDateTime_DateTime
cimport cpython


cdef extern from * namespace "std" nogil:
cdef shared_ptr[T] static_pointer_cast[T, U](shared_ptr[U])


cdef extern from "<optional>" namespace "std" nogil:
cdef cppclass optional[T]:
ctypedef T value_type
optional()
optional(nullopt_t)
optional(optional&) except +
optional(T&) except +
c_bool has_value()
T& value()
T& value_or[U](U& default_value)
void swap(optional&)
void reset()
T& emplace(...)
T& operator*()
# T* operator->() # Not Supported
optional& operator=(optional&)
optional& operator=[U](U&)
cdef extern from "<string_view>" namespace "std" nogil:
# Needed until https://github.com/cython/cython/issues/6651 is fixed
cdef cppclass cpp_string_view "std::string_view":
string_view()
string_view(const char*)
string_view(c_string&)
size_t size()
bint empty()
const char* data()


# vendored from the cymove project https://github.com/ozars/cymove
cdef extern from * namespace "cymove" nogil:
"""
#include <type_traits>
#include <utility>
namespace cymove {
template <typename T>
inline typename std::remove_reference<T>::type&& cymove(T& t) {
return std::move(t);
}
template <typename T>
inline typename std::remove_reference<T>::type&& cymove(T&& t) {
return std::move(t);
}
} // namespace cymove
"""
cdef T move" cymove::cymove"[T](T)

cdef extern from * namespace "arrow::py" nogil:
"""
#include <memory>
#include <string>
#include <string_view>
#include <utility>
namespace arrow {
namespace py {
template <typename T>
std::shared_ptr<T> to_shared(std::unique_ptr<T>& t) {
return std::move(t);
Expand All @@ -88,10 +64,17 @@ cdef extern from * namespace "arrow::py" nogil:
std::shared_ptr<T> to_shared(std::unique_ptr<T>&& t) {
return std::move(t);
}
// Needed until https://github.com/cython/cython/issues/6651 is fixed
inline std::string to_string(std::string_view s) {
return std::string(s);
}
} // namespace py
} // namespace arrow
"""
cdef shared_ptr[T] to_shared" arrow::py::to_shared"[T](unique_ptr[T])
cdef c_string to_string(cpp_string_view s)

cdef extern from "arrow/python/platform.h":
pass
Expand Down Expand Up @@ -173,12 +156,3 @@ cdef inline object PyObject_to_object(PyObject* o):
cdef object result = <object> o
cpython.Py_DECREF(result)
return result


cdef extern from "<string_view>" namespace "std" nogil:
cdef cppclass cpp_string_view "std::string_view":
string_view()
string_view(const char*)
size_t size()
bint empty()
const char* data()
Loading

0 comments on commit 8acf213

Please sign in to comment.