Skip to content

Commit

Permalink
[FLINK-32222] Automatically flush outputStreams in serializers
Browse files Browse the repository at this point in the history
  • Loading branch information
echauchot committed Jun 8, 2023
1 parent 7919a75 commit 4f1cdfa
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public int getVersion() {

@Override
public byte[] serialize(CassandraEnumeratorState cassandraEnumeratorState) throws IOException {
try (final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
final ObjectOutputStream objectOutputStream =
new ObjectOutputStream(byteArrayOutputStream)) {
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (final ObjectOutputStream objectOutputStream =
new ObjectOutputStream(byteArrayOutputStream)) {
final Queue<CassandraSplit> splitsToReassign =
cassandraEnumeratorState.getSplitsToReassign();
objectOutputStream.writeInt(splitsToReassign.size());
Expand All @@ -69,10 +69,8 @@ public byte[] serialize(CassandraEnumeratorState cassandraEnumeratorState) throw
cassandraEnumeratorState.getStartToken(), objectOutputStream);
BigIntegerSerializationUtils.write(
cassandraEnumeratorState.getMaxToken(), objectOutputStream);

objectOutputStream.flush();
return byteArrayOutputStream.toByteArray();
}
return byteArrayOutputStream.toByteArray();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,15 @@ public int getVersion() {

@Override
public byte[] serialize(CassandraSplit cassandraSplit) throws IOException {
try (final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
final ObjectOutputStream objectOutputStream =
new ObjectOutputStream(byteArrayOutputStream)) {
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (final ObjectOutputStream objectOutputStream =
new ObjectOutputStream(byteArrayOutputStream)) {
BigIntegerSerializationUtils.write(
cassandraSplit.getRingRangeStart(), objectOutputStream);
BigIntegerSerializationUtils.write(
cassandraSplit.getRingRangeEnd(), objectOutputStream);
objectOutputStream.flush();
return byteArrayOutputStream.toByteArray();
}
return byteArrayOutputStream.toByteArray();
}

@Override
Expand Down

0 comments on commit 4f1cdfa

Please sign in to comment.