Skip to content

Commit 7def317

Browse files
committed
fix: prevent multiple queries on close when mapping/aggregating (resolved gh-372)
1 parent db043ab commit 7def317

File tree

1 file changed

+13
-0
lines changed

1 file changed

+13
-0
lines changed

redis-om-spring/src/main/java/com/redis/om/spring/search/stream/SearchStreamImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ public <T> SearchStream<T> map(Function<? super E, ? extends T> mapper) {
184184
return new WrapperSearchStream<>(resolveStream().map(mapper));
185185
}
186186

187+
resolvedStream = Stream.empty();
188+
187189
return new ReturnFieldsSearchStreamImpl<>(this, returning, mappingConverter, getGson(), isDocument);
188190
}
189191

@@ -334,6 +336,7 @@ public long count() {
334336
Query query = (rootNode.toString().isBlank()) ? new Query() : new Query(rootNode.toString());
335337
query.limit(0, 0);
336338
SearchResult searchResult = search.search(query);
339+
resolvedStream = Stream.empty();
337340

338341
return searchResult.getTotalResults();
339342
}
@@ -566,6 +569,8 @@ public Stream<Long> map(ToLongFunction<? super E> mapper) {
566569

567570
result = wrappedIds.mapToLong(mapper).boxed();
568571
}
572+
resolvedStream = Stream.empty();
573+
569574
return result;
570575
}
571576

@@ -576,12 +581,14 @@ public Stream<Map<String, Object>> mapToLabelledMaps() {
576581

577582
@SafeVarargs @Override
578583
public final <R> AggregationStream<R> groupBy(MetamodelField<E, ?>... fields) {
584+
resolvedStream = Stream.empty();
579585
String query = (rootNode.toString().isBlank()) ? "*" : rootNode.toString();
580586
return new AggregationStreamImpl<>(searchIndex, modulesOperations, getGson(), entityClass, query, fields);
581587
}
582588

583589
@Override
584590
public <R> AggregationStream<R> apply(String expression, String alias) {
591+
resolvedStream = Stream.empty();
585592
String query = (rootNode.toString().isBlank()) ? "*" : rootNode.toString();
586593
AggregationStream<R> aggregationStream = new AggregationStreamImpl<>(searchIndex, modulesOperations, getGson(), entityClass, query);
587594
aggregationStream.apply(expression, alias);
@@ -590,6 +597,7 @@ public <R> AggregationStream<R> apply(String expression, String alias) {
590597

591598
@SafeVarargs @Override
592599
public final <R> AggregationStream<R> load(MetamodelField<E, ?>... fields) {
600+
resolvedStream = Stream.empty();
593601
String query = (rootNode.toString().isBlank()) ? "*" : rootNode.toString();
594602
AggregationStream<R> aggregationStream = new AggregationStreamImpl<>(searchIndex, modulesOperations, getGson(), entityClass, query);
595603
aggregationStream.load(fields);
@@ -598,6 +606,7 @@ public final <R> AggregationStream<R> load(MetamodelField<E, ?>... fields) {
598606

599607
@Override
600608
public <R> AggregationStream<R> loadAll() {
609+
resolvedStream = Stream.empty();
601610
String query = (rootNode.toString().isBlank()) ? "*" : rootNode.toString();
602611
AggregationStream<R> aggregationStream = new AggregationStreamImpl<>(searchIndex, modulesOperations, getGson(), entityClass, query);
603612
aggregationStream.loadAll();
@@ -606,6 +615,7 @@ public <R> AggregationStream<R> loadAll() {
606615

607616
@Override
608617
public <R> AggregationStream<R> cursor(int count, Duration timeout) {
618+
resolvedStream = Stream.empty();
609619
String query = (rootNode.toString().isBlank()) ? "*" : rootNode.toString();
610620
AggregationStream<R> aggregationStream = new AggregationStreamImpl<>(searchIndex, modulesOperations, getGson(), entityClass, query);
611621
aggregationStream.cursor(count, timeout);
@@ -614,6 +624,7 @@ public <R> AggregationStream<R> cursor(int count, Duration timeout) {
614624

615625
@Override
616626
public Optional<E> min(NumericField<E, ?> field) {
627+
resolvedStream = Stream.empty();
617628
List<Pair<String, ?>> minByField = this //
618629
.load(new MetamodelField<E, String>("__key", String.class)) //
619630
.sorted(Order.asc("@" + field.getSearchAlias()))
@@ -625,6 +636,7 @@ public Optional<E> min(NumericField<E, ?> field) {
625636

626637
@Override
627638
public Optional<E> max(NumericField<E, ?> field) {
639+
resolvedStream = Stream.empty();
628640
List<Pair<String, ?>> maxByField = this //
629641
.load(new MetamodelField<E, String>("__key", String.class)) //
630642
.sorted(1, Order.desc("@" + field.getSearchAlias()))
@@ -645,6 +657,7 @@ public Optional<E> max(NumericField<E, ?> field) {
645657

646658
@Override
647659
public Slice<E> getSlice(Pageable pageable) {
660+
resolvedStream = Stream.empty();
648661
if (pageable.getClass().isAssignableFrom(AggregationPageable.class)) {
649662
AggregationPageable ap = (AggregationPageable) pageable;
650663
AggregationResult ar = search.cursorRead(ap.getCursorId(), pageable.getPageSize());

0 commit comments

Comments
 (0)