Skip to content

Commit

Permalink
Added comments for implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Jai Balani committed Feb 13, 2025
1 parent 003663b commit 0b93349
Show file tree
Hide file tree
Showing 2 changed files with 315 additions and 77 deletions.
137 changes: 121 additions & 16 deletions ambry-store/src/main/java/com/github/ambry/store/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,39 @@
* FileStore provides file system operations for the File Copy Protocol in Ambry.
* It handles reading and writing of data chunks and metadata to disk, managing file operations
* with proper serialization and error handling.
*
* Key responsibilities:
* 1. File Operations:
* - Reading/writing data chunks
* - Managing metadata files
* - Handling file streams
*
* 2. Data Integrity:
* - CRC validation
* - Atomic file operations
* - Error handling
*
* 3. Resource Management:
* - Stream handling
* - File handle cleanup
* - Memory management
*
* Thread Safety:
* - All public methods are thread-safe
* - Uses atomic operations for file writes
* - Maintains thread-safe state management
*/
public class FileStore {
// Logger instance for this class
private static final Logger logger = LoggerFactory.getLogger(FileStore.class);

// Flag to track the running state of the FileStore
private static boolean isRunning = false;

// Handles serialization/deserialization of file metadata
private final FileMetadataSerde fileMetadataSerde;

// Configuration for file copy operations
private final FileCopyConfig fileCopyConfig;

/**
Expand All @@ -54,6 +82,7 @@ public class FileStore {
* @throws NullPointerException if fileCopyConfig is null
*/
public FileStore(FileCopyConfig fileCopyConfig) {
// Initialize metadata serializer and store config
this.fileMetadataSerde = new FileMetadataSerde();
this.fileCopyConfig = fileCopyConfig;
}
Expand All @@ -63,6 +92,7 @@ public FileStore(FileCopyConfig fileCopyConfig) {
* @throws StoreException if the service fails to start
*/
public void start() throws StoreException {
// Mark the service as running
isRunning = true;
}

Expand All @@ -78,6 +108,7 @@ public boolean isRunning() {
* Stops the FileStore service.
*/
public void stop() {
// Mark the service as stopped
isRunning = false;
}

Expand All @@ -93,15 +124,28 @@ public void stop() {
*/
public ByteBuffer getStreamForFileRead(String mountPath, String fileName, int offset, int size)
throws IOException {
// Verify service is running before proceeding
if (!isRunning) {
throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure);
}

// Construct the full file path
String filePath = mountPath + "/" + fileName;
File file = new File(filePath);

// Use RandomAccessFile for seeking to specific position
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");

// Seek to the specified offset
randomAccessFile.seek(offset);

// Allocate buffer for reading data
ByteBuffer buf = ByteBuffer.allocate(size);

// Read data into buffer
randomAccessFile.getChannel().read(buf);

// Prepare buffer for reading
buf.flip();
return buf;
}
Expand All @@ -116,9 +160,12 @@ public ByteBuffer getStreamForFileRead(String mountPath, String fileName, int of
*/
public void putChunkToFile(String outputFilePath, FileInputStream fileInputStream)
throws IOException {
// Verify service is running
if (!isRunning) {
throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure);
}

// Validate input
if (fileInputStream == null) {
throw new IllegalArgumentException("fileInputStream is null");
}
Expand All @@ -128,9 +175,13 @@ public void putChunkToFile(String outputFilePath, FileInputStream fileInputStrea
byte[] content = new byte[(int) fileSize];
fileInputStream.read(content);

// Write content to the output file with create and append options
Files.write(Paths.get(outputFilePath), content, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
logger.info("Write successful for chunk to file: {} with contents: {}", outputFilePath, new String(content));
// Write content to file with create and append options
Files.write(Paths.get(outputFilePath), content,
StandardOpenOption.CREATE, StandardOpenOption.APPEND);

// Log successful write operation
logger.info("Write successful for chunk to file: {} with contents: {}",
outputFilePath, new String(content));
}

/**
Expand All @@ -142,24 +193,34 @@ public void putChunkToFile(String outputFilePath, FileInputStream fileInputStrea
* @throws IllegalArgumentException if logInfoList is null
*/
public void persistMetaDataToFile(String mountPath, List<LogInfo> logInfoList) throws IOException {
// Verify service is running
if(!isRunning){
throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure);
}

// Validate input
if(logInfoList == null){
throw new IllegalArgumentException("logInfoList is null");
}

// Create temporary and actual file paths
File temp = new File(mountPath, fileCopyConfig.fileCopyMetaDataFileName + ".tmp");
File actual = new File(mountPath, fileCopyConfig.fileCopyMetaDataFileName);

try {
// Write metadata to temporary file first
FileOutputStream fileStream = new FileOutputStream(temp);
fileMetadataSerde.persist(logInfoList, fileStream);
logger.info("FileCopyMetadata file serialized and written to file: {}", actual.getAbsolutePath());
// swap temp file with the original file
logger.info("FileCopyMetadata file serialized and written to file: {}",
actual.getAbsolutePath());

// Atomically rename temp file to actual file
temp.renameTo(actual);
logger.debug("Completed writing filecopy metadata to file {}", actual.getAbsolutePath());
logger.debug("Completed writing filecopy metadata to file {}",
actual.getAbsolutePath());
} catch (IOException e) {
logger.error("IO error while persisting filecopy metadata to disk {}", temp.getAbsoluteFile());
logger.error("IO error while persisting filecopy metadata to disk {}",
temp.getAbsoluteFile());
throw e;
}
}
Expand All @@ -172,23 +233,33 @@ public void persistMetaDataToFile(String mountPath, List<LogInfo> logInfoList) t
* @throws FileStoreException if the service is not running
*/
public List<LogInfo> readMetaDataFromFile(String mountPath) throws IOException {
// Initialize empty list for results
List<LogInfo> logInfoList = new ArrayList<>();

// Verify service is running
if(!isRunning){
throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure);
throw new FileStoreException("FileStore is not running",
FileStoreErrorCode.FileStoreRunningFailure);
}

// Get metadata file
File fileCopyMetaDataFile = new File(mountPath, fileCopyConfig.fileCopyMetaDataFileName);

// Return empty list if file doesn't exist
if (!fileCopyMetaDataFile.exists()) {
logger.info("fileCopyMetaDataFile {} not found", fileCopyMetaDataFile.getAbsolutePath());
return logInfoList;
}

try {
// Read metadata from file
FileInputStream fileStream = new FileInputStream(fileCopyMetaDataFile);
logger.info("Attempting reading from file: {}", fileCopyMetaDataFile.getAbsolutePath());
logInfoList = fileMetadataSerde.retrieve(fileStream);
return logInfoList;
} catch (IOException e) {
logger.error("IO error while reading filecopy metadata from disk {}", fileCopyMetaDataFile.getAbsoluteFile());
logger.error("IO error while reading filecopy metadata from disk {}",
fileCopyMetaDataFile.getAbsoluteFile());
throw e;
}
}
Expand All @@ -204,8 +275,21 @@ public void shutdown() {
/**
* Inner class that handles serialization and deserialization of file metadata.
* Implements custom serialization format with CRC checking for data integrity.
*
* Format Structure:
* 1. Number of entries (int)
* 2. For each entry:
* - Sealed segment info
* - Index segments list
* - Bloom filters list
* 3. CRC checksum (long)
*
* Thread Safety:
* - Thread-safe through synchronization on file operations
* - Immutable internal state
*/
private static class FileMetadataSerde {
// Size of CRC value in bytes
private static final short Crc_Size = 8;

/**
Expand All @@ -216,22 +300,29 @@ private static class FileMetadataSerde {
*/
public void persist(List<LogInfo> logInfoList, OutputStream outputStream)
throws IOException {
// Create CRC output stream to calculate checksum while writing
CrcOutputStream crcOutputStream = new CrcOutputStream(outputStream);
DataOutputStream writer = new DataOutputStream(crcOutputStream);
try {

// Write the size of the log info list
writer.writeInt(logInfoList.size());

// Iterate through each log info entry
for (LogInfo logInfo : logInfoList) {
// write log segment size and name
// Write sealed segment information
writer.writeLong(logInfo.getSealedSegment().getFileSize());
writer.writeLong(logInfo.getSealedSegment().getFileName().getBytes().length);
writer.write(logInfo.getSealedSegment().getFileName().getBytes());

// Write index segments information
writer.writeInt(logInfo.getIndexSegments().size());
for(FileInfo fileInfo : logInfo.getIndexSegments()){
writer.writeLong(fileInfo.getFileSize());
writer.writeLong(fileInfo.getFileName().getBytes().length);
writer.write(fileInfo.getFileName().getBytes());
}

// Write bloom filters information
writer.writeInt(logInfo.getBloomFilters().size());
for(FileInfo fileInfo: logInfo.getBloomFilters()){
writer.writeLong(fileInfo.getFileSize());
Expand All @@ -240,14 +331,15 @@ public void persist(List<LogInfo> logInfoList, OutputStream outputStream)
}
}

// Write CRC value at the end for validation
long crcValue = crcOutputStream.getValue();
writer.writeLong(crcValue);
} catch (IOException e) {
logger.error("IO error while serializing Filecopy metadata", e);
throw e;
} finally {
if (outputStream instanceof FileOutputStream) {
// flush and overwrite file
// Ensure data is written to disk
((FileOutputStream) outputStream).getChannel().force(true);
}
writer.close();
Expand All @@ -261,20 +353,28 @@ public void persist(List<LogInfo> logInfoList, OutputStream outputStream)
* @throws IOException if there are issues during deserialization
*/
public List<LogInfo> retrieve(InputStream inputStream) throws IOException {
// Initialize result list
List<LogInfo> logInfoList = new ArrayList<>();

// Create CRC input stream to validate checksum while reading
CrcInputStream crcStream = new CrcInputStream(inputStream);
DataInputStream stream = new DataInputStream(crcStream);

try {
// Continue reading while there's more data than just CRC
while (stream.available() > Crc_Size) {
// Read number of log info entries
int logInfoListSize = stream.readInt();

for(int i = 0; i < logInfoListSize; i++){
// read log segment name
// Read sealed segment information
Long logSegmentSize = stream.readLong();
byte[] logSegmentNameBytes = new byte[(int) stream.readLong()];
stream.readFully(logSegmentNameBytes);
String logSegmentName = new String(logSegmentNameBytes);
FileInfo logSegment = new FileInfo(logSegmentName, logSegmentSize);
// read index segments

// Read index segments
int indexSegmentsSize = stream.readInt();
List<FileInfo> indexSegments = new ArrayList<>();
for(int j = 0; j < indexSegmentsSize; j++){
Expand All @@ -284,7 +384,8 @@ public List<LogInfo> retrieve(InputStream inputStream) throws IOException {
String indexSegmentName = new String(indexSegmentNameBytes);
indexSegments.add(new FileInfo(indexSegmentName, fileSize));
}
// read bloom filters

// Read bloom filters
int bloomFiltersSize = stream.readInt();
List<FileInfo> bloomFilters = new ArrayList<>();
for(int j = 0; j < bloomFiltersSize; j++){
Expand All @@ -294,14 +395,18 @@ public List<LogInfo> retrieve(InputStream inputStream) throws IOException {
String bloomFilterName = new String(bloomFilterNameBytes);
bloomFilters.add(new FileInfo(bloomFilterName, fileSize));
}

// Create and add LogInfo object to result list
logInfoList.add(new LogInfo(logSegment, indexSegments, bloomFilters));
}
}

// Validate CRC
long computedCrc = crcStream.getValue();
long readCrc = stream.readLong();
if (computedCrc != readCrc) {
logger.error("Crc mismatch during filecopy metadata deserialization, computed " + computedCrc + ", read " + readCrc);
logger.error("Crc mismatch during filecopy metadata deserialization, computed " +
computedCrc + ", read " + readCrc);
return new ArrayList<>();
}
return logInfoList;
Expand Down
Loading

0 comments on commit 0b93349

Please sign in to comment.