diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index c7d227890cb44..41b4c52c581e1 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -1235,15 +1235,17 @@ public void putFunctionState(final String tenant, try { DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName); ByteBuffer data; - if (state.getStringValue() != null) { - data = ByteBuffer.wrap(state.getStringValue().getBytes(UTF_8)); - } else if (state.getByteValue() != null) { - data = ByteBuffer.wrap(state.getByteValue()); - } else if (state.getNumberValue() != null) { - data = ByteBuffer.allocate(Long.BYTES); - data.putLong(state.getNumberValue()); + if (state.getByteValue() == null || state.getByteValue().length == 0) { + if (state.getStringValue() != null) { + data = ByteBuffer.wrap(state.getStringValue().getBytes(UTF_8)); + } else if (state.getNumberValue() != null) { + data = ByteBuffer.allocate(Long.BYTES); + data.putLong(state.getNumberValue()); + } else { + throw new IllegalArgumentException("Invalid state value"); + } } else { - throw new IllegalArgumentException("Invalid state value"); + data = ByteBuffer.wrap(state.getByteValue()); } store.put(key, data); } catch (Throwable e) { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java index f44cdffe6399d..856e4edfea023 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java @@ -28,6 +28,7 @@ import static org.testng.Assert.expectThrows; import static org.testng.Assert.fail; import com.google.common.base.Utf8; +import com.google.gson.Gson; import java.util.Base64; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -112,6 +113,20 @@ private void doTestPythonWordCountFunction(String functionName) throws Exception String expectNumber = "\"numberValue\": 20"; putAndQueryState(functionName, "test-number", numberState, expectNumber); + byte[] valueBytes = Base64.getDecoder().decode(VALUE_BASE64); + String bytesString = Base64.getEncoder().encodeToString(valueBytes); + String byteState = "{\"key\":\"test-bytes\",\"byteValue\":\"" + bytesString + "\"}"; + putAndQueryStateByte(functionName, "test-bytes", byteState, valueBytes); + + String valueStr = "hello pulsar"; + byte[] valueStrBytes = valueStr.getBytes(UTF_8); + String bytesStrString = Base64.getEncoder().encodeToString(valueStrBytes); + String byteStrState = "{\"key\":\"test-str-bytes\",\"byteValue\":\"" + bytesStrString + "\"}"; + putAndQueryState(functionName, "test-str-bytes", byteStrState, valueStr); + + String byteStrStateWithEmptyValues = "{\"key\":\"test-str-bytes\",\"byteValue\":\"" + bytesStrString + "\",\"stringValue\":\"\",\"numberValue\":0}"; + putAndQueryState(functionName, "test-str-bytes", byteStrStateWithEmptyValues, valueStr); + // delete function deleteFunction(functionName); @@ -539,6 +554,33 @@ private void putAndQueryState(String functionName, String key, String state, Str assertTrue(result.getStdout().contains(expect)); } + private void putAndQueryStateByte(String functionName, String key, String state, byte[] expect) + throws Exception { + container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "functions", + "putstate", + "--tenant", "public", + "--namespace", "default", + "--name", functionName, + "--state", state + ); + + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "functions", + "querystate", + "--tenant", "public", + "--namespace", "default", + "--name", functionName, + "--key", key + ); + + FunctionState byteState = new Gson().fromJson(result.getStdout(), FunctionState.class); + assertNull(byteState.getStringValue()); + assertEquals(byteState.getByteValue(), expect); + } + private void publishAndConsumeMessages(String inputTopic, String outputTopic, int numMessages) throws Exception {