Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class StreamStringTermsAggregator extends AbstractStringTermsAggregator {
protected int segmentsWithSingleValuedOrds = 0;
protected int segmentsWithMultiValuedOrds = 0;
protected final ResultStrategy<?, ?, ?> resultStrategy;
private boolean leafCollectorCreated = false;

public StreamStringTermsAggregator(
String name,
Expand All @@ -72,6 +73,7 @@ public void doReset() {
super.doReset();
valueCount = 0;
sortedDocValuesPerBatch = null;
this.leafCollectorCreated = false;
}

@Override
Expand All @@ -91,6 +93,13 @@ public InternalAggregation buildEmptyAggregation() {

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
if (this.leafCollectorCreated) {
throw new IllegalStateException(
"Calling " + StreamStringTermsAggregator.class.getSimpleName() + " for the second segment: " + ctx
);
} else {
this.leafCollectorCreated = true;
}
this.sortedDocValuesPerBatch = valuesSource.ordinalsValues(ctx);
this.valueCount = sortedDocValuesPerBatch.getValueCount(); // for streaming case, the value count is reset to per batch
// cardinality
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,15 @@ public void testBuildAggregationsBatchWithCountOrder() throws Exception {

public void testBuildAggregationsBatchReset() throws Exception {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) {
Document document = new Document();
document.add(new SortedSetDocValuesField("field", new BytesRef("test")));
indexWriter.addDocument(document);
document = new Document();
document.add(new SortedSetDocValuesField("field", new BytesRef("best")));
indexWriter.addDocument(document);

try (IndexReader indexReader = maybeWrapReaderEs(indexWriter.getReader())) {
try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) {
IndexSearcher indexSearcher = newIndexSearcher(indexReader);
MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("field");

Expand All @@ -369,7 +372,7 @@ public void testBuildAggregationsBatchReset() throws Exception {
aggregator.postCollection();

StringTerms firstResult = (StringTerms) aggregator.buildAggregations(new long[] { 0 })[0];
assertThat(firstResult.getBuckets().size(), equalTo(1));
assertThat(firstResult.getBuckets().size(), equalTo(2));

aggregator.doReset();

Expand All @@ -379,7 +382,7 @@ public void testBuildAggregationsBatchReset() throws Exception {
aggregator.postCollection();

StringTerms secondResult = (StringTerms) aggregator.buildAggregations(new long[] { 0 })[0];
assertThat(secondResult.getBuckets().size(), equalTo(1));
assertThat(secondResult.getBuckets().size(), equalTo(2));
assertThat(secondResult.getBuckets().get(0).getDocCount(), equalTo(1L));
}
}
Expand Down Expand Up @@ -426,7 +429,7 @@ public void testMultipleBatches() throws Exception {

public void testSubAggregationWithMax() throws Exception {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) {
Document document = new Document();
document.add(new SortedSetDocValuesField("category", new BytesRef("electronics")));
document.add(new NumericDocValuesField("price", 100));
Expand All @@ -442,7 +445,7 @@ public void testSubAggregationWithMax() throws Exception {
document.add(new NumericDocValuesField("price", 50));
indexWriter.addDocument(document);

try (IndexReader indexReader = maybeWrapReaderEs(indexWriter.getReader())) {
try (IndexReader indexReader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) {
IndexSearcher indexSearcher = newIndexSearcher(indexReader);
MappedFieldType categoryFieldType = new KeywordFieldMapper.KeywordFieldType("category");
MappedFieldType priceFieldType = new NumberFieldMapper.NumberFieldType("price", NumberFieldMapper.NumberType.LONG);
Expand Down Expand Up @@ -1162,6 +1165,41 @@ public void testReduceSingleAggregation() throws Exception {
}
}

public void testThrowOnManySegments() throws Exception {
try (Directory directory = newDirectory()) {
try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) {
for (int i = 0; i < atLeast(2); i++) {
Document doc = new Document();
doc.add(new SortedSetDocValuesField("category", new BytesRef("electronics")));
indexWriter.addDocument(doc);
indexWriter.commit();
}
try (IndexReader reader = maybeWrapReaderEs(DirectoryReader.open(indexWriter))) {
IndexSearcher searcher = newIndexSearcher(reader);
MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("category");
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("categories").field("category")
.order(BucketOrder.count(false)); // Order by count descending

StreamStringTermsAggregator aggregator = createStreamAggregator(
null,
aggregationBuilder,
searcher,
createIndexSettings(),
new MultiBucketConsumerService.MultiBucketConsumer(
DEFAULT_MAX_BUCKETS,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
),
fieldType
);

// Execute the aggregator
aggregator.preCollection();
assertThrows(IllegalStateException.class, () -> { searcher.search(new MatchAllDocsQuery(), aggregator); });
}
}
}
}

private InternalAggregation buildInternalStreamingAggregation(
TermsAggregationBuilder builder,
MappedFieldType fieldType1,
Expand Down
Loading