Skip to content

Commit

Permalink
GH-44629: [C++][Acero] Use implicit_ordering for asof_join rather…
Browse files Browse the repository at this point in the history
… than `require_sequenced_output` (#44616)

### Rationale for this change
Changes in #44083 (GH-41706) unnecessarily sequences batches retrieved from scanner where it only requires the batches to provide index according to implicit input order.

### What changes are included in this PR?
Setting `implicit_ordering` causes existing code to set batch index, which is then available to the `asof_join` node to sequence the batches int input order. This replaces some of #44083 changes.

Some code introduced by #44083 turns out to not be required and has therefore been reverted.

### Are these changes tested?
Existing unit tests still pass.

### Are there any user-facing changes?
Reverts some breaking user-facing changes done by #44083.
* GitHub Issue: #41706
* GitHub Issue: #44629

Authored-by: Enrico Minack <[email protected]>
Signed-off-by: Rossi Sun <[email protected]>
  • Loading branch information
EnricoMi authored Feb 11, 2025
1 parent f6e2cbe commit 9410327
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 21 deletions.
4 changes: 2 additions & 2 deletions cpp/src/arrow/acero/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions {
std::shared_ptr<Schema> output_schema;
/// \brief an asynchronous stream of batches ending with std::nullopt
std::function<Future<std::optional<ExecBatch>>()> generator;

Ordering ordering = Ordering::Unordered();
/// \brief the order of the data, defaults to Ordering::Unordered
Ordering ordering;
};

/// \brief a node that generates data from a table already loaded in memory
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ struct SchemaSourceNode : public SourceNode {
struct RecordBatchReaderSourceNode : public SourceNode {
RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
arrow::AsyncGenerator<std::optional<ExecBatch>> generator)
: SourceNode(plan, schema, generator, Ordering::Implicit()) {}
: SourceNode(plan, schema, generator) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
auto scan_options = scan_node_options.scan_options;
auto dataset = scan_node_options.dataset;
bool require_sequenced_output = scan_node_options.require_sequenced_output;
bool implicit_ordering = scan_node_options.implicit_ordering;

RETURN_NOT_OK(NormalizeScanOptions(scan_options, dataset->schema()));

Expand Down Expand Up @@ -1032,11 +1033,11 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
} else {
batch_gen = std::move(merged_batch_gen);
}
int64_t index = require_sequenced_output ? 0 : compute::kUnsequencedIndex;

auto gen = MakeMappedGenerator(
std::move(batch_gen),
[scan_options, index](const EnumeratedRecordBatch& partial) mutable
-> Result<std::optional<compute::ExecBatch>> {
[scan_options](const EnumeratedRecordBatch& partial)
-> Result<std::optional<compute::ExecBatch>> {
// TODO(ARROW-13263) fragments may be able to attach more guarantees to batches
// than this, for example parquet's row group stats. Failing to do this leaves
// perf on the table because row group stats could be used to skip kernel execs in
Expand All @@ -1057,11 +1058,10 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
batch->values.emplace_back(partial.record_batch.index);
batch->values.emplace_back(partial.record_batch.last);
batch->values.emplace_back(partial.fragment.value->ToString());
if (index != compute::kUnsequencedIndex) batch->index = index++;
return batch;
});

auto ordering = require_sequenced_output ? Ordering::Implicit() : Ordering::Unordered();
auto ordering = implicit_ordering ? Ordering::Implicit() : Ordering::Unordered();

auto fields = scan_options->dataset_schema->fields();
if (scan_options->add_augmented_fields) {
Expand Down
15 changes: 11 additions & 4 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -557,20 +557,27 @@ class ARROW_DS_EXPORT ScannerBuilder {
/// \brief Construct a source ExecNode which yields batches from a dataset scan.
///
/// Does not construct associated filter or project nodes.
/// Yielded batches will be augmented with fragment/batch indices to enable stable
/// ordering for simple ExecPlans.
///
/// Batches are yielded sequentially, like single-threaded,
/// when require_sequenced_output=true.
///
/// Yielded batches will be augmented with fragment/batch indices when
/// implicit_ordering=true to enable stable ordering for simple ExecPlans.
class ARROW_DS_EXPORT ScanNodeOptions : public acero::ExecNodeOptions {
public:
explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset,
std::shared_ptr<ScanOptions> scan_options,
bool require_sequenced_output = false)
bool require_sequenced_output = false,
bool implicit_ordering = false)
: dataset(std::move(dataset)),
scan_options(std::move(scan_options)),
require_sequenced_output(require_sequenced_output) {}
require_sequenced_output(require_sequenced_output),
implicit_ordering(implicit_ordering) {}

std::shared_ptr<Dataset> dataset;
std::shared_ptr<ScanOptions> scan_options;
bool require_sequenced_output;
bool implicit_ordering;
};

/// @}
Expand Down
12 changes: 8 additions & 4 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4077,13 +4077,15 @@ cdef class _ScanNodeOptions(ExecNodeOptions):
cdef:
shared_ptr[CScanOptions] c_scan_options
bint require_sequenced_output=False
bint implicit_ordering=False

c_scan_options = Scanner._make_scan_options(dataset, scan_options)

require_sequenced_output=scan_options.get("require_sequenced_output", False)
implicit_ordering=scan_options.get("implicit_ordering", False)

self.wrapped.reset(
new CScanNodeOptions(dataset.unwrap(), c_scan_options, require_sequenced_output)
new CScanNodeOptions(dataset.unwrap(), c_scan_options, require_sequenced_output, implicit_ordering)
)


Expand All @@ -4101,8 +4103,8 @@ class ScanNodeOptions(_ScanNodeOptions):
expression or projection to the scan node that you also supply
to the filter or project node.
Yielded batches will be augmented with fragment/batch indices to
enable stable ordering for simple ExecPlans.
Yielded batches will be augmented with fragment/batch indices when
implicit_ordering=True to enable stable ordering for simple ExecPlans.
Parameters
----------
Expand All @@ -4111,7 +4113,9 @@ class ScanNodeOptions(_ScanNodeOptions):
**kwargs : dict, optional
Scan options. See `Scanner.from_dataset` for possible arguments.
require_sequenced_output : bool, default False
Assert implicit ordering on data.
Batches are yielded sequentially, like single-threaded
implicit_ordering : bool, default False
Preserve implicit ordering of data.
"""

def __init__(self, Dataset dataset, **kwargs):
Expand Down
8 changes: 4 additions & 4 deletions python/pyarrow/acero.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ class InMemoryDataset:
ds = DatasetModuleStub


def _dataset_to_decl(dataset, use_threads=True, require_sequenced_output=False):
def _dataset_to_decl(dataset, use_threads=True, implicit_ordering=False):
decl = Declaration("scan", ScanNodeOptions(
dataset, use_threads=use_threads,
require_sequenced_output=require_sequenced_output))
implicit_ordering=implicit_ordering))

# Get rid of special dataset columns
# "__fragment_index", "__batch_index", "__last_in_fragment", "__filename"
Expand Down Expand Up @@ -316,15 +316,15 @@ def _perform_join_asof(left_operand, left_on, left_by,
left_source = _dataset_to_decl(
left_operand,
use_threads=use_threads,
require_sequenced_output=True)
implicit_ordering=True)
else:
left_source = Declaration(
"table_source", TableSourceNodeOptions(left_operand),
)
if isinstance(right_operand, ds.Dataset):
right_source = _dataset_to_decl(
right_operand, use_threads=use_threads,
require_sequenced_output=True)
implicit_ordering=True)
else:
right_source = Declaration(
"table_source", TableSourceNodeOptions(right_operand)
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
CExpression filter

cdef cppclass CScanNodeOptions "arrow::dataset::ScanNodeOptions"(CExecNodeOptions):
CScanNodeOptions(shared_ptr[CDataset] dataset, shared_ptr[CScanOptions] scan_options, bint require_sequenced_output)
CScanNodeOptions(shared_ptr[CDataset] dataset, shared_ptr[CScanOptions] scan_options, bint require_sequenced_output, bint implicit_ordering)

shared_ptr[CScanOptions] scan_options

Expand Down

0 comments on commit 9410327

Please sign in to comment.