diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java index bb8533b6..0d4dffd8 100644 --- a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java @@ -49,9 +49,9 @@ public int getVersion() { @Override public byte[] serialize(CassandraEnumeratorState cassandraEnumeratorState) throws IOException { - try (final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - final ObjectOutputStream objectOutputStream = - new ObjectOutputStream(byteArrayOutputStream)) { + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (final ObjectOutputStream objectOutputStream = + new ObjectOutputStream(byteArrayOutputStream)) { final Queue splitsToReassign = cassandraEnumeratorState.getSplitsToReassign(); objectOutputStream.writeInt(splitsToReassign.size()); @@ -69,10 +69,8 @@ public byte[] serialize(CassandraEnumeratorState cassandraEnumeratorState) throw cassandraEnumeratorState.getStartToken(), objectOutputStream); BigIntegerSerializationUtils.write( cassandraEnumeratorState.getMaxToken(), objectOutputStream); - - objectOutputStream.flush(); - return byteArrayOutputStream.toByteArray(); } + return byteArrayOutputStream.toByteArray(); } @Override diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java index 888998d3..a7f68d1f 100644 --- a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java @@ -44,16 +44,15 @@ public int getVersion() { @Override public byte[] serialize(CassandraSplit cassandraSplit) throws IOException { - try (final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - final ObjectOutputStream objectOutputStream = - new ObjectOutputStream(byteArrayOutputStream)) { + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (final ObjectOutputStream objectOutputStream = + new ObjectOutputStream(byteArrayOutputStream)) { BigIntegerSerializationUtils.write( cassandraSplit.getRingRangeStart(), objectOutputStream); BigIntegerSerializationUtils.write( cassandraSplit.getRingRangeEnd(), objectOutputStream); - objectOutputStream.flush(); - return byteArrayOutputStream.toByteArray(); } + return byteArrayOutputStream.toByteArray(); } @Override