@@ -625,21 +625,6 @@ std::unique_ptr<SourceMerger> SpillMerger::createSourceMerger(
625
625
type, std::move (streams), maxOutputBatchRows, maxOutputBatchBytes, pool);
626
626
}
627
627
628
- // static.
629
- void SpillMerger::asyncReadFromSpillFileStream (
630
- const std::weak_ptr<SpillMerger>& mergeHolder,
631
- size_t streamIdx) {
632
- TestValue::adjust (
633
- " facebook::velox::exec::SpillMerger::asyncReadFromSpillFileStream" ,
634
- static_cast <void *>(0 ));
635
- const auto merger = mergeHolder.lock ();
636
- if (merger == nullptr ) {
637
- LOG (ERROR) << " SpillMerger is destroyed, abandon reading from batch stream" ;
638
- return ;
639
- }
640
- merger->readFromSpillFileStream (mergeHolder, streamIdx);
641
- }
642
-
643
628
void SpillMerger::readFromSpillFileStream (
644
629
const std::weak_ptr<SpillMerger>& mergeHolder,
645
630
size_t streamIdx) {
@@ -656,54 +641,45 @@ void SpillMerger::readFromSpillFileStream(
656
641
657
642
if (*error_.rlock () != nullptr ) {
658
643
ContinueFuture future{ContinueFuture::makeEmpty ()};
659
- LOG (ERROR) << " MACDUAN " << streamIdx << " th source end, push null ." ;
644
+ LOG (ERROR) << " Stop the " << streamIdx << " th source on error ." ;
660
645
sources_[streamIdx]->enqueue (nullptr , &future);
661
646
return ;
662
647
}
663
648
664
649
RowVectorPtr vector;
665
650
ContinueFuture future{ContinueFuture::makeEmpty ()};
666
651
if (!batchStreams_[streamIdx]->nextBatch (vector)) {
667
- LOG (ERROR) << " MACDUAN " << streamIdx
668
- << " th source reach th end, push null." ;
669
-
670
652
VELOX_CHECK_NULL (vector);
671
653
sources_[streamIdx]->enqueue (nullptr , &future);
672
654
VELOX_CHECK (!future.valid ());
673
655
return ;
674
656
}
657
+
675
658
const auto blockingReason =
676
659
sources_[streamIdx]->enqueue (std::move (vector), &future);
677
- // TODO: add async error handling.
678
660
if (blockingReason == BlockingReason::kNotBlocked ) {
679
661
VELOX_CHECK (!future.valid ());
680
662
readFromSpillFileStream (mergeHolder, streamIdx);
681
663
} else {
682
664
VELOX_CHECK (future.valid ());
683
- std::move (future).via (executor_).thenTry ([this , mergeHolder, streamIdx](
684
- folly::Try<folly::Unit>&&
685
- t) {
686
- const auto merger = mergeHolder.lock ();
687
- if (merger == nullptr ) {
688
- LOG (ERROR)
689
- << " SpillMerger is destroyed, abandon reading from batch stream" ;
690
- return ;
691
- }
692
- if (t.hasException ()) {
693
- LOG (ERROR) << " Stop the " << streamIdx
694
- << " th batch stream producer on error: "
695
- << t.exception ().what ();
696
- *error_.wlock () = std::make_exception_ptr (t.exception ());
697
- ContinueFuture future{ContinueFuture::makeEmpty ()};
698
- sources_[streamIdx]->enqueue (nullptr , &future);
699
- } else {
700
- readFromSpillFileStream (mergeHolder, streamIdx);
701
- }
702
- });
665
+ std::move (future).via (executor_).thenTry (
666
+ [this , mergeHolder, streamIdx](folly::Try<folly::Unit>&& t) {
667
+ const auto merger = mergeHolder.lock ();
668
+ if (merger == nullptr ) {
669
+ return ;
670
+ }
671
+ if (t.hasException ()) {
672
+ LOG (ERROR) << " Stop the " << streamIdx << " th source on error." ;
673
+ *error_.wlock () = std::make_exception_ptr (t.exception ());
674
+ ContinueFuture future{ContinueFuture::makeEmpty ()};
675
+ sources_[streamIdx]->enqueue (nullptr , &future);
676
+ } else {
677
+ readFromSpillFileStream (mergeHolder, streamIdx);
678
+ }
679
+ });
703
680
}
704
681
} catch (const std::exception& e) {
705
- LOG (ERROR) << " MACDUAN " << streamIdx << " th catch exception. "
706
- << e.what ();
682
+ LOG (ERROR) << " The " << streamIdx << " th catch exception. " << e.what ();
707
683
*error_.wlock () = std::current_exception ();
708
684
readFromSpillFileStream (mergeHolder, streamIdx);
709
685
}
@@ -713,11 +689,7 @@ void SpillMerger::scheduleAsyncSpillFileStreamReads() {
713
689
VELOX_CHECK_EQ (batchStreams_.size (), sources_.size ());
714
690
for (auto i = 0 ; i < batchStreams_.size (); ++i) {
715
691
executor_->add ([&, streamIdx = i]() {
716
- try {
717
- readFromSpillFileStream (std::weak_ptr (shared_from_this ()), streamIdx);
718
- } catch (...) {
719
- *error_.wlock () = std::current_exception ();
720
- }
692
+ readFromSpillFileStream (std::weak_ptr (shared_from_this ()), streamIdx);
721
693
});
722
694
}
723
695
}
0 commit comments