Skip to content

Commit

Permalink
XREAD support for reading last message from stream (#2863)
Browse files Browse the repository at this point in the history
* Add last() utility method to the XArgs.StreamOffset

* Submitted one more file by mistake
  • Loading branch information
tishun committed May 28, 2024
1 parent 27dbd4b commit 522abfd
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 23 deletions.
21 changes: 18 additions & 3 deletions src/main/java/io/lettuce/core/XReadArgs.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package io.lettuce.core;

import java.time.Duration;

import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.CommandKeyword;

import java.time.Duration;

/**
* Argument list builder for the Redis <a href="https://redis.io/commands/xread">XREAD</a> and {@literal XREADGROUP} commands.
* Static import the methods from {@link XReadArgs.Builder} and call the methods: {@code block(…)} .
Expand Down Expand Up @@ -171,7 +171,7 @@ private StreamOffset(K name, String offset) {
}

/**
* Read all new arriving elements from the stream identified by {@code name}.
* Read all new arriving elements from the stream identified by {@code name} excluding any elements before this call
*
* @param name must not be {@code null}.
* @return the {@link StreamOffset} object without a specific offset.
Expand All @@ -183,6 +183,21 @@ public static <K> StreamOffset<K> latest(K name) {
return new StreamOffset<>(name, "$");
}

/**
* Read all new arriving elements from the stream identified by {@code name} including the last element added before
* this call
*
* @param name must not be {@code null}.
* @return the {@link StreamOffset} object without a specific offset.
* @since 7.0
*/
public static <K> StreamOffset<K> last(K name) {

LettuceAssert.notNull(name, "Stream must not be null");

return new StreamOffset<>(name, "+");
}

/**
* Read all new arriving elements from the stream identified by {@code name} with ids greater than the last one consumed
* by the consumer group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,6 @@
*/
package io.lettuce.core.commands;

import static io.lettuce.core.protocol.CommandType.*;
import static org.assertj.core.api.Assertions.*;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import javax.inject.Inject;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;

import io.lettuce.core.*;
import io.lettuce.core.XReadArgs.StreamOffset;
import io.lettuce.core.api.sync.RedisCommands;
Expand All @@ -49,6 +30,23 @@
import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.test.LettuceExtension;
import io.lettuce.test.condition.EnabledOnCommand;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;

import javax.inject.Inject;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static io.lettuce.core.protocol.CommandType.XINFO;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Integration tests for {@link io.lettuce.core.api.sync.RedisStreamCommands}.
Expand Down Expand Up @@ -316,7 +314,7 @@ public void xreadTransactional() {
redis.multi();
redis.xadd("stream-1", Collections.singletonMap("key3", "value3"));
redis.xadd("stream-2", Collections.singletonMap("key4", "value4"));
redis.xread(StreamOffset.from("stream-1", initial1), XReadArgs.StreamOffset.from("stream-2", initial2));
redis.xread(StreamOffset.from("stream-1", initial1), StreamOffset.from("stream-2", initial2));

TransactionResult exec = redis.exec();

Expand All @@ -337,6 +335,23 @@ public void xreadTransactional() {
assertThat(secondMessage.getBody()).containsEntry("key4", "value4");
}

@Test
public void xreadLastVsLatest() {
redis.xadd("stream-1", Collections.singletonMap("key1", "value1"));
redis.xadd("stream-1", Collections.singletonMap("key2", "value2"));

List<StreamMessage<String, String>> lastMessages = redis.xread(StreamOffset.last("stream-1"));
List<StreamMessage<String, String>> latestMessages = redis.xread(StreamOffset.latest("stream-1"));

assertThat(lastMessages).hasSize(1);
StreamMessage<String, String> lastMessage = lastMessages.get(0);

assertThat(lastMessage.getStream()).isEqualTo("stream-1");
assertThat(lastMessage.getBody()).hasSize(1).containsEntry("key2", "value2");

assertThat(latestMessages).isEmpty();
}

@Test
void xinfoStream() {

Expand Down

0 comments on commit 522abfd

Please sign in to comment.