Skip to content

Commit

Permalink
Issue 3489: FileSystemStorage should write data synchronously to avoi…
Browse files Browse the repository at this point in the history
…d data loss (pravega#3491)

* Calls FileChannel.force() at the end of write and concat.

Signed-off-by: Sachin Joshi <[email protected]>
  • Loading branch information
sachin-j-joshi authored and fpj committed Mar 28, 2019
1 parent d463831 commit c8e4574
Showing 1 changed file with 22 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,28 +303,29 @@ private Void doWrite(SegmentHandle handle, long offset, InputStream data, int le
throw new StreamSegmentSealedException(handle.getSegmentName());
}

long fileSize = path.toFile().length();
if (fileSize < offset) {
throw new BadOffsetException(handle.getSegmentName(), fileSize, offset);
} else {
long totalBytesWritten = 0;
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.WRITE)) {
// Wrap the input data into a ReadableByteChannel, but do not close it. Doing so will result in closing
// the underlying InputStream, which is not desirable if it is to be reused.
ReadableByteChannel sourceChannel = Channels.newChannel(data);
while (length != 0) {
long bytesWritten = channel.transferFrom(sourceChannel, offset, length);
assert bytesWritten > 0 : "Unable to make any progress transferring data.";
offset += bytesWritten;
totalBytesWritten += bytesWritten;
length -= bytesWritten;
}
long totalBytesWritten = 0;
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.WRITE)) {
long fileSize = channel.size();
if (fileSize != offset) {
throw new BadOffsetException(handle.getSegmentName(), fileSize, offset);
}
FileSystemMetrics.WRITE_LATENCY.reportSuccessEvent(timer.getElapsed());
FileSystemMetrics.WRITE_BYTES.add(totalBytesWritten);
LoggerHelpers.traceLeave(log, "write", traceId);
return null;

// Wrap the input data into a ReadableByteChannel, but do not close it. Doing so will result in closing
// the underlying InputStream, which is not desirable if it is to be reused.
ReadableByteChannel sourceChannel = Channels.newChannel(data);
while (length != 0) {
long bytesWritten = channel.transferFrom(sourceChannel, offset, length);
assert bytesWritten > 0 : "Unable to make any progress transferring data.";
offset += bytesWritten;
totalBytesWritten += bytesWritten;
length -= bytesWritten;
}
channel.force(false);
}
FileSystemMetrics.WRITE_LATENCY.reportSuccessEvent(timer.getElapsed());
FileSystemMetrics.WRITE_BYTES.add(totalBytesWritten);
LoggerHelpers.traceLeave(log, "write", traceId);
return null;
}

private boolean isWritableFile(Path path) throws IOException {
Expand Down Expand Up @@ -377,6 +378,7 @@ private Void doConcat(SegmentHandle targetHandle, long offset, String sourceSegm
offset += bytesTransferred;
length -= bytesTransferred;
}
targetChannel.force(false);
Files.delete(sourcePath);
LoggerHelpers.traceLeave(log, "concat", traceId);
return null;
Expand Down

0 comments on commit c8e4574

Please sign in to comment.