Skip to content

Commit 5ca6565

Browse files
committed
feature: implements cursors in the EntityStreams API (resolved gh-242)
1 parent 55b32ad commit 5ca6565

17 files changed

+2217
-25
lines changed

redis-om-spring/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@
315315
</plugin>
316316
<plugin>
317317
<artifactId>maven-compiler-plugin</artifactId>
318-
<version>3.8.1</version>
318+
<version>3.10.1</version>
319319
<configuration>
320320
<source>${maven.compiler.source}</source>
321321
<target>${maven.compiler.target}</target>

redis-om-spring/src/main/java/com/redis/om/spring/RedisOMSpringProperties.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,8 @@
11
package com.redis.om.spring;
22

3-
import ai.djl.huggingface.tokenizers.HuggingFaceTokenizer;
4-
import ai.djl.modality.cv.Image;
5-
import ai.djl.modality.cv.transform.CenterCrop;
6-
import ai.djl.modality.cv.transform.Resize;
7-
import ai.djl.modality.cv.transform.ToTensor;
8-
import ai.djl.repository.zoo.Criteria;
9-
import ai.djl.translate.Pipeline;
10-
import jakarta.validation.constraints.NotBlank;
11-
import jakarta.validation.constraints.NotEmpty;
123
import jakarta.validation.constraints.NotNull;
134
import lombok.Data;
145
import org.springframework.boot.context.properties.ConfigurationProperties;
15-
import org.springframework.context.annotation.Bean;
16-
17-
import java.util.HashMap;
18-
import java.util.Map;
196

207
@ConfigurationProperties(
218
prefix = "redis.om.spring",
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package com.redis.om.spring.search.stream;
2+
3+
import com.google.gson.Gson;
4+
import com.redis.om.spring.convert.MappingRedisOMConverter;
5+
import com.redis.om.spring.util.ObjectUtils;
6+
import lombok.NonNull;
7+
import org.springframework.data.domain.*;
8+
import org.springframework.util.Assert;
9+
import redis.clients.jedis.search.aggr.AggregationResult;
10+
import redis.clients.jedis.util.SafeEncoder;
11+
12+
import java.io.Serializable;
13+
import java.util.Iterator;
14+
import java.util.List;
15+
import java.util.function.Function;
16+
import java.util.stream.Collectors;
17+
18+
public class AggregationPage<E> implements Slice<E>, Serializable {
19+
private List<E> content;
20+
private final Pageable pageable;
21+
private AggregationStream aggregationStream;
22+
private long cursorId = -1;
23+
private AggregationResult aggregationResult;
24+
private final Gson gson;
25+
private final Class<E> entityClass;
26+
private final boolean isDocument;
27+
private final MappingRedisOMConverter mappingConverter;
28+
29+
public AggregationPage(AggregationStream aggregationStream, Pageable pageable, Class<E> entityClass, Gson gson, MappingRedisOMConverter mappingConverter, boolean isDocument) {
30+
this.aggregationStream = aggregationStream;
31+
this.pageable = pageable;
32+
this.entityClass = entityClass;
33+
this.gson = gson;
34+
this.isDocument = isDocument;
35+
this.mappingConverter = mappingConverter;
36+
}
37+
38+
public AggregationPage(AggregationResult aggregationResult, Pageable pageable, Class<E> entityClass, Gson gson, MappingRedisOMConverter mappingConverter, boolean isDocument) {
39+
this.aggregationResult = aggregationResult;
40+
this.pageable = pageable;
41+
this.entityClass = entityClass;
42+
this.gson = gson;
43+
this.cursorId = aggregationResult.getCursorId();
44+
this.isDocument = isDocument;
45+
this.mappingConverter = mappingConverter;
46+
}
47+
48+
@Override
49+
public int getNumber() {
50+
return pageable.getPageNumber();
51+
}
52+
53+
@Override
54+
public int getSize() {
55+
return resolveAggregation().getResults().size();
56+
}
57+
58+
@Override
59+
public int getNumberOfElements() {
60+
return getSize();
61+
}
62+
63+
@Override
64+
public @NonNull List<E> getContent() {
65+
return resolveContent();
66+
}
67+
68+
@Override
69+
public boolean hasContent() {
70+
return !resolveContent().isEmpty();
71+
}
72+
73+
@Override
74+
public @NonNull Sort getSort() {
75+
return pageable.getSort();
76+
}
77+
78+
@Override
79+
public boolean isFirst() {
80+
return getNumber() == 0;
81+
}
82+
83+
@Override
84+
public boolean isLast() {
85+
return resolveCursorId() == 0;
86+
}
87+
88+
@Override
89+
public boolean hasNext() {
90+
return cursorId == -1 || resolveCursorId() != 0;
91+
}
92+
93+
@Override
94+
public @NonNull Pageable nextPageable() {
95+
Pageable next = PageRequest.of(getNumber() + 1, pageable.getPageSize(), pageable.getSort());
96+
return hasNext() ? new AggregationPageable(next, resolveAggregation().getCursorId()) : Pageable.unpaged();
97+
}
98+
99+
@Override
100+
public <U> @NonNull Slice<U> map(Function<? super E, ? extends U> converter) {
101+
return new SliceImpl<>(getConvertedContent(converter), pageable, hasNext());
102+
}
103+
104+
@NonNull
105+
@Override
106+
public Iterator<E> iterator() {
107+
return resolveContent().iterator();
108+
}
109+
110+
// Unsupported operations - there is no backwards navigation with RediSearch Cursors
111+
@Override
112+
public boolean hasPrevious() {
113+
return false;
114+
}
115+
116+
@Override
117+
public @NonNull Pageable previousPageable() {
118+
return Pageable.unpaged();
119+
}
120+
// END - Unsupported operations
121+
122+
private AggregationResult resolveAggregation() {
123+
if (aggregationResult == null) {
124+
aggregationResult = aggregationStream.aggregate();
125+
}
126+
return aggregationResult;
127+
}
128+
129+
private List<E> resolveContent() {
130+
if (content == null) {
131+
this.content = toEntityList(resolveAggregation());
132+
}
133+
return content;
134+
}
135+
136+
private long resolveCursorId() {
137+
if (cursorId != -1) {
138+
cursorId = resolveAggregation().getCursorId();
139+
}
140+
return cursorId;
141+
}
142+
143+
protected <U> List<U> getConvertedContent(Function<? super E, ? extends U> converter) {
144+
145+
Assert.notNull(converter, "Function must not be null");
146+
147+
return this.stream().map(converter::apply).collect(Collectors.toList());
148+
}
149+
150+
List<E> toEntityList(AggregationResult aggregationResult) {
151+
if (isDocument) {
152+
return aggregationResult.getResults().stream().map(d -> gson.fromJson(SafeEncoder.encode((byte[])d.get("$")), entityClass)).toList();
153+
} else {
154+
return aggregationResult.getResults().stream().map(h -> (E) ObjectUtils.mapToObject(h, entityClass, mappingConverter)).toList();
155+
}
156+
}
157+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.redis.om.spring.search.stream;
2+
3+
import org.springframework.data.domain.Pageable;
4+
import org.springframework.data.domain.Sort;
5+
6+
public class AggregationPageable implements Pageable {
7+
8+
private final Pageable pageable;
9+
private final long cursorId;
10+
11+
public AggregationPageable(Pageable pageable, long cursorId) {
12+
this.pageable = pageable;
13+
this.cursorId = cursorId;
14+
}
15+
16+
public long getCursorId() {
17+
return cursorId;
18+
}
19+
20+
@Override
21+
public int getPageNumber() {
22+
return pageable.getPageNumber();
23+
}
24+
25+
@Override
26+
public int getPageSize() {
27+
return pageable.getPageSize();
28+
}
29+
30+
@Override
31+
public long getOffset() {
32+
return pageable.getOffset();
33+
}
34+
35+
@Override
36+
public Sort getSort() {
37+
return pageable.getSort();
38+
}
39+
40+
@Override
41+
public Pageable next() {
42+
return null;
43+
}
44+
45+
@Override
46+
public Pageable previousOrFirst() {
47+
return null;
48+
}
49+
50+
@Override
51+
public Pageable first() {
52+
return null;
53+
}
54+
55+
@Override
56+
public Pageable withPage(int pageNumber) {
57+
return null;
58+
}
59+
60+
@Override
61+
public boolean hasPrevious() {
62+
return false;
63+
}
64+
}

redis-om-spring/src/main/java/com/redis/om/spring/search/stream/AggregationStream.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import com.redis.om.spring.annotations.ReducerFunction;
44
import com.redis.om.spring.metamodel.MetamodelField;
5+
import org.springframework.data.domain.PageRequest;
6+
import org.springframework.data.domain.Slice;
57
import org.springframework.data.domain.Sort.Order;
68
import redis.clients.jedis.search.aggr.AggregationResult;
79

@@ -11,6 +13,8 @@
1113
public interface AggregationStream<T> {
1214
AggregationStream<T> load(MetamodelField<?, ?>... fields);
1315

16+
AggregationStream<T> loadAll();
17+
1418
AggregationStream<T> groupBy(MetamodelField<?, ?>... fields);
1519

1620
AggregationStream<T> apply(String expression, String alias);
@@ -42,4 +46,9 @@ public interface AggregationStream<T> {
4246
AggregationResult aggregateVerbatim(Duration timeout);
4347

4448
<R extends T> List<R> toList(Class<?>... contentTypes);
49+
50+
// Cursor API
51+
AggregationStream<T> cursor(int i, Duration duration);
52+
<R extends T> Slice<R> toList(PageRequest pageRequest, Class<?>... contentTypes);
53+
<R extends T> Slice<R> toList(PageRequest pageRequest, Duration duration, Class<?>... contentTypes);
4554
}

0 commit comments

Comments
 (0)