Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[bug][fn] Prevent putstate uses empty values (apache#22127)
Browse files Browse the repository at this point in the history
  • Loading branch information
freeznet authored Feb 27, 2024
1 parent ae050a1 commit 430f4ff
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1235,15 +1235,17 @@ public void putFunctionState(final String tenant,
try {
DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
ByteBuffer data;
if (state.getStringValue() != null) {
data = ByteBuffer.wrap(state.getStringValue().getBytes(UTF_8));
} else if (state.getByteValue() != null) {
data = ByteBuffer.wrap(state.getByteValue());
} else if (state.getNumberValue() != null) {
data = ByteBuffer.allocate(Long.BYTES);
data.putLong(state.getNumberValue());
if (state.getByteValue() == null || state.getByteValue().length == 0) {
if (state.getStringValue() != null) {
data = ByteBuffer.wrap(state.getStringValue().getBytes(UTF_8));
} else if (state.getNumberValue() != null) {
data = ByteBuffer.allocate(Long.BYTES);
data.putLong(state.getNumberValue());
} else {
throw new IllegalArgumentException("Invalid state value");
}
} else {
throw new IllegalArgumentException("Invalid state value");
data = ByteBuffer.wrap(state.getByteValue());
}
store.put(key, data);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.testng.Assert.expectThrows;
import static org.testng.Assert.fail;
import com.google.common.base.Utf8;
import com.google.gson.Gson;
import java.util.Base64;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -112,6 +113,20 @@ private void doTestPythonWordCountFunction(String functionName) throws Exception
String expectNumber = "\"numberValue\": 20";
putAndQueryState(functionName, "test-number", numberState, expectNumber);

byte[] valueBytes = Base64.getDecoder().decode(VALUE_BASE64);
String bytesString = Base64.getEncoder().encodeToString(valueBytes);
String byteState = "{\"key\":\"test-bytes\",\"byteValue\":\"" + bytesString + "\"}";
putAndQueryStateByte(functionName, "test-bytes", byteState, valueBytes);

String valueStr = "hello pulsar";
byte[] valueStrBytes = valueStr.getBytes(UTF_8);
String bytesStrString = Base64.getEncoder().encodeToString(valueStrBytes);
String byteStrState = "{\"key\":\"test-str-bytes\",\"byteValue\":\"" + bytesStrString + "\"}";
putAndQueryState(functionName, "test-str-bytes", byteStrState, valueStr);

String byteStrStateWithEmptyValues = "{\"key\":\"test-str-bytes\",\"byteValue\":\"" + bytesStrString + "\",\"stringValue\":\"\",\"numberValue\":0}";
putAndQueryState(functionName, "test-str-bytes", byteStrStateWithEmptyValues, valueStr);

// delete function
deleteFunction(functionName);

Expand Down Expand Up @@ -539,6 +554,33 @@ private void putAndQueryState(String functionName, String key, String state, Str
assertTrue(result.getStdout().contains(expect));
}

private void putAndQueryStateByte(String functionName, String key, String state, byte[] expect)
throws Exception {
container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"putstate",
"--tenant", "public",
"--namespace", "default",
"--name", functionName,
"--state", state
);

ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"querystate",
"--tenant", "public",
"--namespace", "default",
"--name", functionName,
"--key", key
);

FunctionState byteState = new Gson().fromJson(result.getStdout(), FunctionState.class);
assertNull(byteState.getStringValue());
assertEquals(byteState.getByteValue(), expect);
}

private void publishAndConsumeMessages(String inputTopic,
String outputTopic,
int numMessages) throws Exception {
Expand Down

0 comments on commit 430f4ff

Please sign in to comment.