diff --git a/ambry-api/src/main/java/com/github/ambry/clustermap/FileStoreException.java b/ambry-api/src/main/java/com/github/ambry/clustermap/FileStoreException.java new file mode 100644 index 0000000000..f47e79b074 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/clustermap/FileStoreException.java @@ -0,0 +1,21 @@ +package com.github.ambry.clustermap; + +public class FileStoreException extends RuntimeException{ + + private static final long serialVersionUID = 1L; + private final FileStoreErrorCode error; + + public FileStoreException(String s, FileStoreErrorCode error) { + super(s); + this.error = error; + } + + public FileStoreException(String s, FileStoreErrorCode error, Throwable throwable) { + super(s, throwable); + this.error = error; + } + + public enum FileStoreErrorCode{ + FileStoreRunningFailure + } +} \ No newline at end of file diff --git a/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java b/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java index 8bb0b73199..4d7a3580d8 100644 --- a/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java +++ b/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java @@ -83,6 +83,6 @@ public enum TransitionErrorCode { /** * If Bootstap Controller fails in pre-filecopy steps for specific replica. */ - BootstrapControllerFailure + BootstrapControllerFailure, } } diff --git a/ambry-api/src/main/java/com/github/ambry/config/FileCopyConfig.java b/ambry-api/src/main/java/com/github/ambry/config/FileCopyConfig.java new file mode 100644 index 0000000000..e84553ff17 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/config/FileCopyConfig.java @@ -0,0 +1,37 @@ +package com.github.ambry.config; + +public class FileCopyConfig { + + public static final String PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK = "parallel.partition.hydration.count.per.disk"; + @Config(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK) + public final int parallelPartitionHydrationCountPerDisk; + + public static final String NUMBER_OF_FILE_COPY_THREADS = "number.of.file.copy.threads"; + @Config(NUMBER_OF_FILE_COPY_THREADS) + public final int numberOfFileCopyThreads; + + public static final String FILE_CHUNK_TIMEOUT_IN_MINUTES = "file.chunk.timeout.in.minutes"; + @Config(FILE_CHUNK_TIMEOUT_IN_MINUTES) + public final long fileChunkTimeoutInMins; + + /** + * The frequency at which the data gets flushed to disk + */ + public static final String STORE_DATA_FLUSH_INTERVAL_IN_MBS = "store.data.flush.interval.In.MBs"; + @Config(STORE_DATA_FLUSH_INTERVAL_IN_MBS) + @Default("1000") + public final long storeDataFlushIntervalInMbs; + + public static final String FILE_COPY_META_DATA_FILE_NAME = "file.copy.meta.data.file.name"; + @Config(FILE_COPY_META_DATA_FILE_NAME) + @Default("sealed_logs_metadata_file") + public final String fileCopyMetaDataFileName; + + public FileCopyConfig(VerifiableProperties verifiableProperties) { + fileCopyMetaDataFileName = verifiableProperties.getString(FILE_COPY_META_DATA_FILE_NAME, "sealed_logs_metadata_file"); + parallelPartitionHydrationCountPerDisk = verifiableProperties.getInt(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK, 1); + numberOfFileCopyThreads = verifiableProperties.getInt(NUMBER_OF_FILE_COPY_THREADS, 4); + fileChunkTimeoutInMins = verifiableProperties.getInt(FILE_CHUNK_TIMEOUT_IN_MINUTES, 5); + storeDataFlushIntervalInMbs = verifiableProperties.getLong(STORE_DATA_FLUSH_INTERVAL_IN_MBS, 1000); + } +} \ No newline at end of file diff --git a/ambry-api/src/main/java/com/github/ambry/config/Http2ClientConfig.java b/ambry-api/src/main/java/com/github/ambry/config/Http2ClientConfig.java index e4f47ff53a..84df1d3718 100644 --- a/ambry-api/src/main/java/com/github/ambry/config/Http2ClientConfig.java +++ b/ambry-api/src/main/java/com/github/ambry/config/Http2ClientConfig.java @@ -203,10 +203,10 @@ public Http2ClientConfig(VerifiableProperties verifiableProperties) { http2DropRequestOnWriteAndFlushTimeout = verifiableProperties.getBoolean(HTTP2_DROP_REQUEST_ON_WRITE_AND_FLUSH_TIMEOUT, false); http2BlockingChannelAcquireTimeoutMs = verifiableProperties.getInt(HTTP2_BLOCKING_CHANNEL_ACQUIRE_TIMEOUT_MS, 1000); - http2BlockingChannelSendTimeoutMs = verifiableProperties.getInt(HTTP2_BLOCKING_CHANNEL_SEND_TIMEOUT_MS, 2000); - http2BlockingChannelReceiveTimeoutMs = verifiableProperties.getInt(HTTP2_BLOCKING_CHANNEL_RECEIVE_TIMEOUT_MS, 5000); + http2BlockingChannelSendTimeoutMs = verifiableProperties.getInt(HTTP2_BLOCKING_CHANNEL_SEND_TIMEOUT_MS, 99999); + http2BlockingChannelReceiveTimeoutMs = verifiableProperties.getInt(HTTP2_BLOCKING_CHANNEL_RECEIVE_TIMEOUT_MS, 99999); http2BlockingChannelPoolShutdownTimeoutMs = - verifiableProperties.getInt(HTTP2_BLOCKING_CHANNEL_POOL_SHUTDOWN_TIMEOUT_MS, 3000); + verifiableProperties.getInt(HTTP2_BLOCKING_CHANNEL_POOL_SHUTDOWN_TIMEOUT_MS, 99999); http2PeerCertificateSanRegex = verifiableProperties.getString(HTTP2_PEER_CERTIFICATE_SAN_REGEX, ""); http2TimeoutAsNetworkError = verifiableProperties.getBoolean(HTTP2_TIMEOUT_AS_NETWORK_ERROR, false); http2RequestAdditionalTimeoutMs = verifiableProperties.getInt(HTTP2_REQUEST_ADDITIONAL_TIMEOUT_MS, 2500); diff --git a/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java b/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java index ff7d492945..f11a9fd128 100644 --- a/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java +++ b/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java @@ -16,10 +16,14 @@ import com.github.ambry.clustermap.ClusterParticipant; import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.store.ChunkResponse; +import com.github.ambry.store.FileStore; +import com.github.ambry.store.LogInfo; import com.github.ambry.store.Store; import com.github.ambry.store.StoreException; +import java.io.DataInputStream; +import java.io.FileInputStream; import java.io.IOException; -import java.nio.file.FileStore; import java.util.Collection; import java.util.List; import java.util.regex.Pattern; @@ -51,6 +55,8 @@ public interface StoreManager { */ boolean addFileStore(ReplicaId replicaId); + void setUpReplica(String partitionName); + /** * Build state after filecopy is completed * @param replica the {@link ReplicaId} of the {@link Store} for which store needs to be built @@ -153,4 +159,23 @@ public interface StoreManager { * @throws IOException */ boolean isFilesExistForPattern(PartitionId partitionId, Pattern pattern) throws IOException; + + /** + * Get the list of log segment metadata files for a given partition. + * @param partitionId + * @param includeActiveLogSegment + * @return List of LogSegmentFiles along with its IndexFiles, BloomFilterFiles + */ + List getLogSegmentMetadataFiles(PartitionId partitionId, boolean includeActiveLogSegment); + + /** + * Get the chunk from the requested file (Log segment, Index File, Bloom Filter). + * @param partitionId + * @param fileName + * @param sizeInBytes + * @param startOffset + * @return FileInputStream containing the chunk of size {@code sizeInBytes} starting from {@code startOffset}. + */ + ChunkResponse getChunk(PartitionId partitionId, String fileName, long sizeInBytes, long startOffset) + throws IOException; } diff --git a/ambry-api/src/main/java/com/github/ambry/store/ChunkResponse.java b/ambry-api/src/main/java/com/github/ambry/store/ChunkResponse.java new file mode 100644 index 0000000000..01f203f5b1 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/store/ChunkResponse.java @@ -0,0 +1,22 @@ +package com.github.ambry.store; + +import java.io.DataInputStream; + + +public class ChunkResponse { + private final DataInputStream stream; + private final long chunkLength; + + public ChunkResponse(DataInputStream stream, long chunkLength) { + this.stream = stream; + this.chunkLength = chunkLength; + } + + public DataInputStream getStream() { + return stream; + } + + public long getChunkLength() { + return chunkLength; + } +} diff --git a/ambry-api/src/main/java/com/github/ambry/store/FileInfo.java b/ambry-api/src/main/java/com/github/ambry/store/FileInfo.java new file mode 100644 index 0000000000..4b576a546e --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/store/FileInfo.java @@ -0,0 +1,41 @@ +/** + * Copyright 2024 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.store; + + +public class FileInfo { + private final String fileName; + private final long fileSize; + + public FileInfo(String fileName, Long fileSize) { + this.fileName = fileName; + this.fileSize = fileSize; + } + + public String getFileName() { + return fileName; + } + + public Long getFileSize() { + return fileSize; + } + + @Override + public String toString() { + return "FileInfo{" + + "fileName='" + fileName + '\'' + + ", fileSize=" + fileSize + + '}'; + } +} \ No newline at end of file diff --git a/ambry-api/src/main/java/com/github/ambry/store/FileStore.java b/ambry-api/src/main/java/com/github/ambry/store/FileStore.java new file mode 100644 index 0000000000..861ba585d4 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/store/FileStore.java @@ -0,0 +1,257 @@ +package com.github.ambry.store; + +import com.github.ambry.clustermap.FileStoreException; +import com.github.ambry.clustermap.FileStoreException.FileStoreErrorCode; +import com.github.ambry.config.FileCopyConfig; +import com.github.ambry.replication.FindToken; +import com.github.ambry.utils.CrcInputStream; +import com.github.ambry.utils.CrcOutputStream; +import com.github.ambry.utils.Pair; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class FileStore { + private static final Logger logger = LoggerFactory.getLogger(FileStore.class); + private static boolean isRunning = false; + private final String dataDir; + private final FileMetadataSerde fileMetadataSerde; + private final FileCopyConfig fileCopyConfig; + + public FileStore(String dataDir, FileCopyConfig fileCopyConfig){ + this.dataDir = dataDir; + this.fileMetadataSerde = new FileMetadataSerde(); + this.fileCopyConfig = fileCopyConfig; + } + + public void start() throws StoreException { + isRunning = true; + } + public boolean isRunning() { + return isRunning; + } + public void stop() { + isRunning = false; + } + + + // TODO Moved to BlobStore as the bootstrapping node wouldn't have FileStore instantiated. + public FileInputStream getStreamForFileRead(String mountPath, String fileName) + throws IOException { + if(!isRunning){ + throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure); + } + // TODO: Handle edge cases and validations + String filePath = mountPath + "/" + fileName; + File file = new File(filePath); + // Check if file exists and is readable + if (!file.exists() || !file.canRead()) { + throw new IOException("File doesn't exist or cannot be read: " + filePath); + } + return new FileInputStream(file); + } + + public void putChunkToFile(String outputFilePath, FileInputStream fileInputStream) + throws IOException { + if(!isRunning){ + throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure); + } + if(fileInputStream == null){ + throw new IllegalArgumentException("fileInputStream is null"); + } + // TODO: Handle edge cases and validations + + // Determine the size of the file + long fileSize = fileInputStream.available(); + + // Read all bytes from the source file and append them to the output file + + byte[] content = new byte[(int) fileSize]; // Read the content of the source file into a byte array + fileInputStream.read(content); // Read bytes into the array + Files.write(Paths.get(outputFilePath), content, StandardOpenOption.CREATE, StandardOpenOption.APPEND); + + System.out.println("Write successful for chunk to file: " + outputFilePath); + } + + // New class in input: List + public void persistMetaDataToFile(String mountPath, List logInfoList) throws IOException { + if(!isRunning){ + throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure); + } + if(logInfoList == null){ + throw new IllegalArgumentException("logInfoList is null"); + } + + File temp = new File(mountPath, fileCopyConfig.fileCopyMetaDataFileName + ".tmp"); + File actual = new File(mountPath, fileCopyConfig.fileCopyMetaDataFileName); + try { + FileOutputStream fileStream = new FileOutputStream(temp); + fileMetadataSerde.persist(logInfoList, fileStream); + System.out.println("FileCopyMetadata file serialized and written to file: " + actual.getAbsolutePath()); + // swap temp file with the original file + temp.renameTo(actual); + logger.debug("Completed writing remote tokens to file {}", actual.getAbsolutePath()); + } catch (IOException e) { + logger.error("IO error while persisting tokens to disk {}", temp.getAbsoluteFile()); + throw e; + } + } + + + public List readMetaDataFromFile(String mountPath) throws IOException { + List logInfoList = new ArrayList<>(); + if(!isRunning){ + throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure); + } + + File fileCopyMetaDataFile = new File(mountPath, fileCopyConfig.fileCopyMetaDataFileName); + if (!fileCopyMetaDataFile.exists()) { + logger.info("fileCopyMetaDataFile {} not found", fileCopyMetaDataFile.getAbsolutePath()); + return logInfoList; + } + try { + FileInputStream fileStream = new FileInputStream(fileCopyMetaDataFile); + System.out.println("Attempting reading from file: " + fileCopyMetaDataFile.getAbsolutePath()); + logInfoList = fileMetadataSerde.retrieve(fileStream); + return logInfoList; + } catch (IOException e) { + logger.error("IO error while reading filecopy metadata from disk {}", fileCopyMetaDataFile.getAbsoluteFile()); + throw e; + } + } + + public void shutdown(){ + return; + } + + /** + * Class to serialize and deserialize replica tokens + */ + private static class FileMetadataSerde { + private static final short Crc_Size = 8; + private static final short VERSION_0 = 0; + private static final short CURRENT_VERSION = VERSION_0; + + public FileMetadataSerde() { + } + + /** + * Serialize the remote tokens to the file + * @param logInfoList the mapping from the replicas to the remote tokens + * @param outputStream the file output stream to write to + */ + public void persist(List logInfoList, OutputStream outputStream) + throws IOException { + CrcOutputStream crcOutputStream = new CrcOutputStream(outputStream); + DataOutputStream writer = new DataOutputStream(crcOutputStream); + try { + + writer.writeInt(logInfoList.size()); + for (LogInfo logInfo : logInfoList) { + // write log segment size and name + writer.writeLong(logInfo.getLogSegment().getFileSize()); + writer.writeLong(logInfo.getLogSegment().getFileName().getBytes().length); + writer.write(logInfo.getLogSegment().getFileName().getBytes()); + writer.writeInt(logInfo.getIndexSegments().size()); + for(FileInfo fileInfo : logInfo.getIndexSegments()){ + writer.writeLong(fileInfo.getFileSize()); + writer.writeLong(fileInfo.getFileName().getBytes().length); + writer.write(fileInfo.getFileName().getBytes()); + } + writer.writeInt(logInfo.getBloomFilters().size()); + for(FileInfo fileInfo: logInfo.getBloomFilters()){ + writer.writeLong(fileInfo.getFileSize()); + writer.writeLong(fileInfo.getFileName().getBytes().length); + writer.write(fileInfo.getFileName().getBytes()); + } + } + + long crcValue = crcOutputStream.getValue(); + writer.writeLong(crcValue); + } catch (IOException e) { + logger.error("IO error while serializing remote peer tokens", e); + throw e; + } finally { + if (outputStream instanceof FileOutputStream) { + // flush and overwrite file + ((FileOutputStream) outputStream).getChannel().force(true); + } + writer.close(); + } + } + + /** + * Deserialize the remote tokens + * @param inputStream the input stream from the persistent file + * @return the mapping from replicas to remote tokens + */ + public List retrieve(InputStream inputStream) throws IOException { + List logInfoList = new ArrayList<>(); + CrcInputStream crcStream = new CrcInputStream(inputStream); + DataInputStream stream = new DataInputStream(crcStream); + ConcurrentMap> peerTokens = new ConcurrentHashMap<>(); + try { + while (stream.available() > Crc_Size) { + int logInfoListSize = stream.readInt(); + for(int i = 0; i < logInfoListSize; i++){ + // read log segment name + Long logSegmentSize = stream.readLong(); + byte[] logSegmentNameBytes = new byte[(int) stream.readLong()]; + stream.readFully(logSegmentNameBytes); + String logSegmentName = new String(logSegmentNameBytes); + FileInfo logSegment = new FileInfo(logSegmentName, logSegmentSize); + // read index segments + int indexSegmentsSize = stream.readInt(); + List indexSegments = new ArrayList<>(); + for(int j = 0; j < indexSegmentsSize; j++){ + Long fileSize = stream.readLong(); + byte[] indexSegmentNameBytes = new byte[(int) stream.readLong()]; + stream.readFully(indexSegmentNameBytes); + String indexSegmentName = new String(indexSegmentNameBytes); + indexSegments.add(new FileInfo(indexSegmentName, fileSize)); + } + // read bloom filters + int bloomFiltersSize = stream.readInt(); + List bloomFilters = new ArrayList<>(); + for(int j = 0; j < bloomFiltersSize; j++){ + Long fileSize = stream.readLong(); + byte[] bloomFilterNameBytes = new byte[(int) stream.readLong()]; + stream.readFully(bloomFilterNameBytes); + String bloomFilterName = new String(bloomFilterNameBytes); + bloomFilters.add(new FileInfo(bloomFilterName, fileSize)); + } + logInfoList.add(new LogInfo(logSegment, indexSegments, bloomFilters)); + } + } + + long computedCrc = crcStream.getValue(); + long readCrc = stream.readLong(); + if (computedCrc != readCrc) { + logger.error("Crc mismatch during peer token deserialization, computed " + computedCrc + ", read " + readCrc); + return new ArrayList<>(); + } + return logInfoList; + } catch (IOException e) { + logger.error("IO error deserializing remote peer tokens", e); + return new ArrayList<>(); + } finally { + stream.close(); + } + } + } +} diff --git a/ambry-api/src/main/java/com/github/ambry/store/LogInfo.java b/ambry-api/src/main/java/com/github/ambry/store/LogInfo.java new file mode 100644 index 0000000000..2bf84e57a4 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/store/LogInfo.java @@ -0,0 +1,63 @@ +/** + * Copyright 2024 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.store; + +import java.util.Collections; +import java.util.List; + + +public class LogInfo { + private FileInfo logSegment; + private List indexSegments; + private List bloomFilters; + + public LogInfo(FileInfo logSegment, List indexSegments, List bloomFilters) { + this.logSegment = logSegment; + this.indexSegments = indexSegments; + this.bloomFilters = bloomFilters; + } + + public FileInfo getLogSegment() { + return logSegment; + } + + public void setLogSegment(FileInfo logSegment) { + this.logSegment = logSegment; + } + + public List getIndexSegments() { + return Collections.unmodifiableList(indexSegments); + } + + public void setIndexSegments(List indexSegments) { + this.indexSegments = indexSegments; + } + + public List getBloomFilters() { + return Collections.unmodifiableList(bloomFilters); + } + + public void setBloomFilters(List bloomFilters) { + this.bloomFilters = bloomFilters; + } + + @Override + public String toString() { + return "LogInfo{" + + "logSegment=" + logSegment + + ", indexSegments=" + indexSegments + + ", bloomFilters=" + bloomFilters + + '}'; + } +} \ No newline at end of file diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java index 09c0a786ce..294eff93bf 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java @@ -22,8 +22,12 @@ import com.github.ambry.config.VerifiableProperties; import com.github.ambry.server.ServerErrorCode; import com.github.ambry.server.StoreManager; +import com.github.ambry.store.ChunkResponse; +import com.github.ambry.store.FileStore; +import com.github.ambry.store.LogInfo; import com.github.ambry.store.Store; -import java.nio.file.FileStore; +import java.io.DataInputStream; +import java.io.FileInputStream; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -81,6 +85,11 @@ public boolean addFileStore(ReplicaId replicaId) { } + @Override + public void setUpReplica(String partitionName) { + throw new UnsupportedOperationException("Method not supported"); + } + @Override public boolean shutdownBlobStore(PartitionId id) { try { @@ -151,6 +160,16 @@ public boolean isFilesExistForPattern(PartitionId partitionId, Pattern allLogSeg throw new UnsupportedOperationException("Method not supported"); } + @Override + public List getLogSegmentMetadataFiles(PartitionId partitionId, boolean includeActiveLogSegment) { + throw new UnsupportedOperationException("Method not supported"); + } + + @Override + public ChunkResponse getChunk(PartitionId partitionId, String fileName, long sizeInBytes, long startOffset) { + throw new UnsupportedOperationException("Method not supported"); + } + @Override public List setBlobStoreStoppedState(List partitionIds, boolean markStop) { throw new UnsupportedOperationException("Method not supported"); diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java index d9de3e9902..cb20cf7366 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java @@ -49,6 +49,8 @@ public class AmbryReplicaSyncUpManager implements ReplicaSyncUpManager { private final ConcurrentHashMap replicaToLagInfos = new ConcurrentHashMap<>(); private final ClusterMapConfig clusterMapConfig; private final ReentrantLock updateLock = new ReentrantLock(); + private final ConcurrentHashMap partitionToFileCopyLatch = new ConcurrentHashMap<>(); + private final ConcurrentHashMap partitionToFileCopySuccessLatch = new ConcurrentHashMap<>(); public AmbryReplicaSyncUpManager(ClusterMapConfig clusterMapConfig) { this.clusterMapConfig = clusterMapConfig; @@ -65,7 +67,8 @@ public void initiateBootstrap(ReplicaId replicaId) { @Override public void initiateFileCopy(ReplicaId replicaId) { - //To Be Added With File Copy Protocol + partitionToFileCopyLatch.put(replicaId.getPartitionId().toPathString(), new CountDownLatch(1)); + partitionToFileCopySuccessLatch.put(replicaId.getPartitionId().toPathString(), false); } @Override @@ -108,7 +111,18 @@ public void waitBootstrapCompleted(String partitionName) throws InterruptedExcep @Override public void waitForFileCopyCompleted(String partitionName) throws InterruptedException { - //To Be Added With File Copy Protocol + CountDownLatch latch = partitionToFileCopyLatch.get(partitionName); + if(latch == null) { + logger.info("Skipping file copy for existing partition {}", partitionName); + } else{ + logger.info("Waiting for new partition to {} to comeplete FileCopy", partitionName); + latch.await(); + partitionToFileCopyLatch.remove(partitionName); + if(!partitionToFileCopySuccessLatch.remove(partitionName)){ + throw new StateTransitionException("Partition " + partitionName + " failed to copy files.", FileCopyProtocolFailure); + } + logger.info("File Copy is complete on partition {}", partitionName); + } } @Override @@ -204,7 +218,8 @@ public void onBootstrapComplete(ReplicaId replicaId) { @Override public void onFileCopyComplete(ReplicaId replicaId) { - //To Be Added With File Copy Protocol + partitionToFileCopySuccessLatch.put(replicaId.getPartitionId().toPathString(), true); + countDownLatch(partitionToFileCopyLatch, replicaId.getPartitionId().toPathString()); } @Override diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java index cb82a82e13..62556e00db 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java @@ -858,27 +858,35 @@ private DataNodeConfig getDataNodeConfig() { @Override public void onPartitionBecomeBootstrapFromOffline(String partitionName) { try { - // 1. take actions in storage manager (add new replica if necessary) - PartitionStateChangeListener storageManagerListener = - partitionStateChangeListeners.get(StateModelListenerType.StorageManagerListener); - if (storageManagerListener != null) { - storageManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); - } - // 2. take actions in replication manager (add new replica if necessary) - PartitionStateChangeListener replicationManagerListener = - partitionStateChangeListeners.get(StateModelListenerType.ReplicationManagerListener); - if (replicationManagerListener != null) { - replicationManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); - } - // 3. take actions in stats manager (add new replica if necessary) - PartitionStateChangeListener statsManagerListener = - partitionStateChangeListeners.get(StateModelListenerType.StatsManagerListener); - if (statsManagerListener != null) { - statsManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); + if(partitionName == "146"){ + Thread.sleep(3600000); + } else { + // 1. take actions in storage manager (add new replica if necessary) + PartitionStateChangeListener storageManagerListener = + partitionStateChangeListeners.get(StateModelListenerType.StorageManagerListener); + if (storageManagerListener != null) { + storageManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); + } + // 2. take actions in replication manager (add new replica if necessary) + PartitionStateChangeListener replicationManagerListener = + partitionStateChangeListeners.get(StateModelListenerType.ReplicationManagerListener); + if (replicationManagerListener != null) { + replicationManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); + } + // 3. take actions in stats manager (add new replica if necessary) + PartitionStateChangeListener statsManagerListener = + partitionStateChangeListeners.get(StateModelListenerType.StatsManagerListener); + if (statsManagerListener != null) { + statsManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); + } } } catch (Exception e) { localPartitionAndState.put(partitionName, ReplicaState.ERROR); - throw e; + try { + throw e; + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } } logger.info("Before setting partition {} to bootstrap", partitionName); localPartitionAndState.put(partitionName, ReplicaState.BOOTSTRAP); diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyController.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyController.java new file mode 100644 index 0000000000..cbbf8cc6c0 --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyController.java @@ -0,0 +1,4 @@ +package com.github.ambry.filetransfer; + +public class FileCopyController { +} diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileBasedReplicationManager.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyManager.java similarity index 63% rename from ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileBasedReplicationManager.java rename to ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyManager.java index 998f56ca53..8bfff68cb5 100644 --- a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileBasedReplicationManager.java +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyManager.java @@ -17,30 +17,42 @@ import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.clustermap.ClusterParticipant; import com.github.ambry.clustermap.DataNodeId; +import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.PartitionStateChangeListener; +import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.clustermap.StateModelListenerType; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.FileCopyBasedReplicationConfig; +import com.github.ambry.config.FileCopyConfig; import com.github.ambry.config.StoreConfig; import com.github.ambry.network.NetworkClientFactory; import com.github.ambry.replica.prioritization.PrioritizationManager; import com.github.ambry.server.StoreManager; +import com.github.ambry.store.FileStore; +import com.github.ambry.store.StoreException; import com.github.ambry.store.StoreKeyFactory; import java.io.IOException; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FileBasedReplicationManager { +public class FileCopyManager { protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final PrioritizationManager prioritizationManager; private final StoreManager storeManager; - public FileBasedReplicationManager(PrioritizationManager prioritizationManager, FileCopyBasedReplicationConfig fileCopyBasedReplicationConfig, ClusterMapConfig clusterMapConfig, - StoreConfig storeConfig, StoreManager storeManager, StoreKeyFactory storeKeyFactory, ClusterMap clusterMap, + private final FileCopyConfig fileCopyConfig; + + private final StoreConfig storeConfig; + + public FileCopyManager(PrioritizationManager prioritizationManager, FileCopyBasedReplicationConfig fileCopyBasedReplicationConfig, ClusterMapConfig clusterMapConfig, + FileCopyConfig fileCopyConfig, StoreConfig storeConfig, StoreManager storeManager, StoreKeyFactory storeKeyFactory, ClusterMap clusterMap, ScheduledExecutorService scheduler, DataNodeId dataNode, NetworkClientFactory networkClientFactory, MetricRegistry metricRegistry, ClusterParticipant clusterParticipant) throws InterruptedException { + this.fileCopyConfig = fileCopyConfig; + this.storeConfig = storeConfig; if (clusterParticipant != null) { clusterParticipant.registerPartitionStateChangeListener(StateModelListenerType.FileCopyManagerListener, new PartitionStateChangeListenerImpl()); @@ -55,15 +67,42 @@ public FileBasedReplicationManager(PrioritizationManager prioritizationManager, } public void start() throws InterruptedException, IOException { + } + + public void callMetaDataAPI(){ + } class PartitionStateChangeListenerImpl implements PartitionStateChangeListener { @Override public void onPartitionBecomeBootstrapFromOffline(String partitionName) { if(storeManager.getReplica(partitionName) == null){ - //storeManager.setUpReplica(partitionName); + storeManager.setUpReplica(partitionName); + } + ReplicaId replicaId = storeManager.getReplica(partitionName); + if(storeManager.getFileStore(replicaId.getPartitionId()) == null){ + storeManager.addFileStore(replicaId); + try { + storeManager.getFileStore(replicaId.getPartitionId()).start(); + } catch (StoreException e) { + throw new RuntimeException(e); + } } - prioritizationManager.addReplica(partitionName); + if(!storeManager.isFileExists(replicaId.getPartitionId(), storeConfig.storeFileCopyCompletedFileName)){ + FileStore fileStore = storeManager.getFileStore(replicaId.getPartitionId()); + PartitionId partitionId = replicaId.getPartitionId(); + List replicaIds = (List) partitionId.getReplicaIds(); + //TODO : Find the + String hostName = replicaIds.get(0).getDataNodeId().getHostname(); + + //callMetaData + //store metadata + //chunk API caller + //build state + }else{ + storeManager.buildStateForFileCopy(replicaId); + } + } @Override diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyOperationState.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyOperationState.java new file mode 100644 index 0000000000..a37f7e7671 --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyOperationState.java @@ -0,0 +1,9 @@ +package com.github.ambry.filetransfer; + +public enum FileCopyOperationState { + Start, + META_DATA_REQUEST_SENT, + META_DATA_RESPONSE_RECEIVED, + CHUNK_DATA_REQUEST_IN_PROGRESS, + CHUNK_DATA_EXCHANGE_COMPLETE, +} diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyOperationTracker.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyOperationTracker.java new file mode 100644 index 0000000000..a6c78c1e2f --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyOperationTracker.java @@ -0,0 +1,45 @@ +package com.github.ambry.filetransfer; + +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.clustermap.ReplicaId; +import java.util.List; + + +public class FileCopyOperationTracker { + + private final PartitionId partitionId; + private final FileCopyOperationState fileCopyOperationState; + + + public FileCopyOperationTracker(PartitionId partitionId, FileCopyOperationState fileCopyOperationState) { + this.partitionId = partitionId; + this.fileCopyOperationState = fileCopyOperationState; + + } + + void start(){ + while(fileCopyOperationState.equals(FileCopyOperationState.CHUNK_DATA_EXCHANGE_COMPLETE)) { + switch (fileCopyOperationState){ + case Start: + break; + case META_DATA_REQUEST_SENT: + break; + case META_DATA_RESPONSE_RECEIVED: + break; + case CHUNK_DATA_REQUEST_IN_PROGRESS: + break; + case CHUNK_DATA_EXCHANGE_COMPLETE: + break; + } + + if (fileCopyOperationState.equals(FileCopyOperationState.Start)){ + + + List replicaIds = (List) partitionId.getReplicaIds(); + String hostName = replicaIds.get(0).getDataNodeId().getHostname(); + String partitionName = String.valueOf(partitionId.getId()); + //fileCopyOperationState = FileCopyOperationState.META_DATA_REQUEST_SENT; + } + } + } +} diff --git a/ambry-network/src/main/java/com/github/ambry/network/LocalRequestResponseChannel.java b/ambry-network/src/main/java/com/github/ambry/network/LocalRequestResponseChannel.java index 8a7b1c9ac5..a18561da7c 100644 --- a/ambry-network/src/main/java/com/github/ambry/network/LocalRequestResponseChannel.java +++ b/ambry-network/src/main/java/com/github/ambry/network/LocalRequestResponseChannel.java @@ -110,7 +110,7 @@ public static class LocalChannelRequest implements NetworkRequest { private long startTimeInMs; private int processorId; - LocalChannelRequest(RequestInfo requestInfo, int processorId) { + public LocalChannelRequest(RequestInfo requestInfo, int processorId) { this.requestInfo = requestInfo; this.processorId = processorId; startTimeInMs = System.currentTimeMillis(); diff --git a/ambry-network/src/main/java/com/github/ambry/network/ServerRequestResponseHelper.java b/ambry-network/src/main/java/com/github/ambry/network/ServerRequestResponseHelper.java index c67c1eee25..ecf6a109d4 100644 --- a/ambry-network/src/main/java/com/github/ambry/network/ServerRequestResponseHelper.java +++ b/ambry-network/src/main/java/com/github/ambry/network/ServerRequestResponseHelper.java @@ -19,6 +19,9 @@ import com.github.ambry.protocol.AdminResponse; import com.github.ambry.protocol.DeleteRequest; import com.github.ambry.protocol.DeleteResponse; +import com.github.ambry.protocol.FileCopyGetChunkResponse; +import com.github.ambry.protocol.FileCopyGetMetaDataRequest; +import com.github.ambry.protocol.FileCopyGetMetaDataResponse; import com.github.ambry.protocol.GetRequest; import com.github.ambry.protocol.GetResponse; import com.github.ambry.protocol.PurgeRequest; @@ -96,6 +99,9 @@ public RequestOrResponse getDecodedRequest(NetworkRequest networkRequest) throws case PurgeRequest: request = PurgeRequest.readFrom(dis, clusterMap); break; + case FileCopyGetMetaDataRequest: + request = FileCopyGetMetaDataRequest.readFrom(dis, clusterMap); + break; default: throw new UnsupportedOperationException("Request type not supported"); } @@ -153,6 +159,12 @@ public Response createErrorResponse(RequestOrResponse request, ServerErrorCode s case PurgeRequest: response = new PurgeResponse(request.getCorrelationId(), request.getClientId(), serverErrorCode); break; + case FileCopyGetMetaDataRequest: + response = new FileCopyGetMetaDataResponse(request.getCorrelationId(), request.getClientId(), serverErrorCode); + break; + case FileCopyGetChunkRequest: + response = new FileCopyGetChunkResponse(request.getCorrelationId(), request.getClientId(), serverErrorCode); + break; default: throw new UnsupportedOperationException("Request type not supported"); } diff --git a/ambry-network/src/test/java/com/github/ambry/network/ServerRequestResponseHelperTest.java b/ambry-network/src/test/java/com/github/ambry/network/ServerRequestResponseHelperTest.java index 43b1ba134b..4938839755 100644 --- a/ambry-network/src/test/java/com/github/ambry/network/ServerRequestResponseHelperTest.java +++ b/ambry-network/src/test/java/com/github/ambry/network/ServerRequestResponseHelperTest.java @@ -26,6 +26,7 @@ import com.github.ambry.protocol.AdminRequest; import com.github.ambry.protocol.AdminRequestOrResponseType; import com.github.ambry.protocol.DeleteRequest; +import com.github.ambry.protocol.FileCopyGetChunkResponse; import com.github.ambry.protocol.GetOption; import com.github.ambry.protocol.GetRequest; import com.github.ambry.protocol.PartitionRequestInfo; @@ -40,14 +41,19 @@ import com.github.ambry.utils.ByteBufferChannel; import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.Collections; +import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.*; @@ -235,6 +241,62 @@ public void replicateBlobRequestResponseTest() throws IOException { } } +// @Test +// public void doFileCopyChunkDataResponseTest() throws IOException { +// short requestVersionToUse = 1; +// String str = "Hello, Netty ByteBuf!"; +// ByteBuf byteBuf = Unpooled.copiedBuffer(str, StandardCharsets.UTF_8); +// FileCopyGetChunkResponse fileCopyGetChunkResponse = new FileCopyGetChunkResponse(requestVersionToUse, 111, +// "id1",ServerErrorCode.No_Error, new MockPartitionId(), "file1", null, +// byteBuf, 1000, 22, false); +// +// DataInputStream requestStream = serAndPrepForRead(fileCopyGetChunkResponse); +// +// FileCopyGetChunkResponse fileCopyGetChunkResponseOnNetwork = FileCopyGetChunkResponse.readFrom(requestStream, new MockClusterMap()); +// String chunkData = Utils.readIntString(fileCopyGetChunkResponseOnNetwork.getChunkDataOnReceiverNode()); +// +// Assert.assertEquals(chunkData, str); +// Assert.assertEquals(fileCopyGetChunkResponseOnNetwork.getCorrelationId(), 111); +// Assert.assertEquals(fileCopyGetChunkResponseOnNetwork.getFileName(), "file1"); +// Assert.assertEquals(fileCopyGetChunkResponseOnNetwork.getChunkSizeInBytes(), 22); +// Assert.assertEquals(fileCopyGetChunkResponseOnNetwork.getStartOffset(), 1000); +// Assert.assertEquals(fileCopyGetChunkResponseOnNetwork.getError(), ServerErrorCode.No_Error); +// Assert.assertEquals(fileCopyGetChunkResponseOnNetwork.getPartitionId().getId(), 0); +// Assert.assertEquals(fileCopyGetChunkResponseOnNetwork.getPartitionId().toPathString(), "0"); +// Assert.assertEquals(fileCopyGetChunkResponseOnNetwork.getVersionId(), requestVersionToUse); +// fileCopyGetChunkResponse.release(); +// } + + @Test + public void doFileCopyChunkDataResponseTest2() throws IOException { + File file = new File("/tmp/0/0_index"); + DataInputStream chunkStreamWithSender = new DataInputStream(Files.newInputStream(file.toPath())); + + FileCopyGetChunkResponse response = new FileCopyGetChunkResponse( + FileCopyGetChunkResponse.File_Copy_Chunk_Response_Version_V1, + 0, "", ServerErrorCode.No_Error, new MockPartitionId(), + file.getName(), chunkStreamWithSender, 0, chunkStreamWithSender.available(), false); + + DataInputStream chunkResponseStream = serAndPrepForRead(response); + FileCopyGetChunkResponse fileCopyGetChunkResponseWithReciever = FileCopyGetChunkResponse.readFrom(chunkResponseStream, new MockClusterMap()); + + Assert.assertEquals(fileCopyGetChunkResponseWithReciever.getCorrelationId(), 0); + Assert.assertEquals(fileCopyGetChunkResponseWithReciever.getFileName(), file.getName()); + Assert.assertEquals(fileCopyGetChunkResponseWithReciever.getStartOffset(), 0); + Assert.assertEquals(fileCopyGetChunkResponseWithReciever.getError(), ServerErrorCode.No_Error); + Assert.assertEquals(fileCopyGetChunkResponseWithReciever.getPartitionId().getId(), 0); + Assert.assertEquals(fileCopyGetChunkResponseWithReciever.getPartitionId().toPathString(), "0"); + Assert.assertEquals(fileCopyGetChunkResponseWithReciever.getVersionId(), + FileCopyGetChunkResponse.File_Copy_Chunk_Response_Version_V1); + Assert.assertEquals(fileCopyGetChunkResponseWithReciever.getChunkStream().available(), file.length()); + +// DataInputStream chunkStreamWithReceiver = fileCopyGetChunkResponseWithReciever.getChunkStream(); +// byte[] buffer = new byte[chunkStreamWithReceiver.available()]; +// chunkStreamWithReceiver.readFully(buffer); +// +// System.out.println("Dw: " + new String(buffer)); + } + private DataInputStream serAndPrepForRead(RequestOrResponse requestOrResponse) throws IOException { DataInputStream stream; ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkRequest.java index c9a75ffc8a..55bebd46bc 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkRequest.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkRequest.java @@ -22,13 +22,15 @@ public class FileCopyGetChunkRequest extends RequestOrResponse{ - private PartitionId partitionId; - private String fileName; - private long startOffset; - private long chunkLengthInBytes; - private static final short File_Chunk_Request_Version_V1 = 1; + private final PartitionId partitionId; + private final String fileName; + private final long startOffset; + private final long chunkLengthInBytes; + public static final short File_Chunk_Request_Version_V1 = 1; private static final int File_Name_Size_In_Bytes = 4; + static short CURRENT_VERSION = File_Chunk_Request_Version_V1; + public FileCopyGetChunkRequest( short versionId, int correlationId, String clientId, PartitionId partitionId, String fileName, long startOffset, long sizeInBytes) { @@ -81,18 +83,21 @@ public long sizeInBytes() { public PartitionId getPartitionId() { return partitionId; } + public String getFileName() { return fileName; } + public long getStartOffset() { return startOffset; } + public long getChunkLengthInBytes() { return chunkLengthInBytes; } static void validateVersion(short version){ - if (version != File_Chunk_Request_Version_V1) { + if (version != CURRENT_VERSION) { throw new IllegalArgumentException("Unknown version for FileMetadataRequest: " + version); } } diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkResponse.java new file mode 100644 index 0000000000..1b31511af2 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkResponse.java @@ -0,0 +1,137 @@ +package com.github.ambry.protocol; + +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.server.ServerErrorCode; +import com.github.ambry.utils.Utils; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.Charset; + + +public class FileCopyGetChunkResponse extends Response { + private final DataInputStream chunkStream; + private final boolean isLastChunk; + private final long startOffset; + private final long chunkSizeInBytes; + private final PartitionId partitionId; + private final String fileName; + + private static final int File_Name_Field_Size_In_Bytes = 4; + public static final short File_Copy_Chunk_Response_Version_V1 = 1; + + static short CURRENT_VERSION = File_Copy_Chunk_Response_Version_V1; + + public FileCopyGetChunkResponse(short versionId, int correlationId, String clientId, ServerErrorCode errorCode, + PartitionId partitionId, String fileName, DataInputStream chunkStream, + long startOffset, long chunkSizeInBytes, boolean isLastChunk) { + super(RequestOrResponseType.FileCopyGetChunkResponse, versionId, correlationId, clientId, errorCode); + + validateVersion(versionId); + + this.chunkStream = chunkStream; + this.startOffset = startOffset; + this.chunkSizeInBytes = chunkSizeInBytes; + this.isLastChunk = isLastChunk; + this.partitionId = partitionId; + this.fileName = fileName; + } + + public FileCopyGetChunkResponse(int correlationId, String clientId, ServerErrorCode errorCode) { + this(CURRENT_VERSION, correlationId, clientId, errorCode, null, null, null, -1, -1, false); + } + + public PartitionId getPartitionId() { + return partitionId; + } + + public String getFileName() { + return fileName; + } + + public long getStartOffset() { + return startOffset; + } + + public boolean isLastChunk() { + return isLastChunk; + } + + public long getChunkSizeInBytes() { + return chunkSizeInBytes; + } + + public long sizeInBytes() { + try { + return super.sizeInBytes() + partitionId.getBytes().length + File_Name_Field_Size_In_Bytes + + fileName.length() + Long.BYTES + Long.BYTES + 1 + Integer.BYTES + + (chunkStream != null ? chunkStream.available() : 0); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static FileCopyGetChunkResponse readFrom(DataInputStream chunkResponseStream, ClusterMap clusterMap) throws IOException { + RequestOrResponseType type = RequestOrResponseType.values()[chunkResponseStream.readShort()]; + if (type != RequestOrResponseType.FileCopyGetChunkResponse) { + throw new IllegalArgumentException("The type of request response is not compatible"); + } + short versionId = chunkResponseStream.readShort(); + validateVersion(versionId); + + int correlationId = chunkResponseStream.readInt(); + String clientId = Utils.readIntString(chunkResponseStream); + + ServerErrorCode errorCode = ServerErrorCode.values()[chunkResponseStream.readShort()]; + if(errorCode != ServerErrorCode.No_Error){ + return new FileCopyGetChunkResponse(correlationId, clientId, errorCode); + } + PartitionId partitionId = clusterMap.getPartitionIdFromStream(chunkResponseStream); + String fileName = Utils.readIntString(chunkResponseStream); + long startOffset = chunkResponseStream.readLong(); + long sizeInBytes = chunkResponseStream.readLong(); + boolean isLastChunk = chunkResponseStream.readBoolean(); + + return new FileCopyGetChunkResponse(versionId, correlationId, clientId, errorCode, partitionId, fileName, + chunkResponseStream, startOffset, sizeInBytes, isLastChunk); + } + + public void prepareBuffer(){ + super.prepareBuffer(); + bufferToSend.writeBytes(partitionId.getBytes()); + Utils.serializeString(bufferToSend, fileName, Charset.defaultCharset()); + bufferToSend.writeLong(startOffset); + bufferToSend.writeLong(chunkSizeInBytes); + bufferToSend.writeBoolean(isLastChunk); + try { + bufferToSend.writeBytes(chunkStream, chunkStream.available()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + try { + sb.append("FileCopyGetChunkResponse[") + .append("PartitionId=").append(partitionId.getId()) + .append(", FileName=").append(fileName) + .append(", SizeInBytes=").append(chunkStream.available()) + .append(", isLastChunk=").append(isLastChunk) + .append("]"); + } catch (IOException e) { + throw new RuntimeException(e); + } + return sb.toString(); + } + + public DataInputStream getChunkStream() { + return chunkStream; + } + + private static void validateVersion(short version) { + if (version != CURRENT_VERSION) { + throw new IllegalArgumentException("Unknown version for FileCopyGetChunkResponse: " + version); + } + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkResponse2.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkResponse2.java new file mode 100644 index 0000000000..f02052c870 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkResponse2.java @@ -0,0 +1,158 @@ +///** +// * Copyright 2024 LinkedIn Corp. All rights reserved. +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// */ +package com.github.ambry.protocol; + +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.server.ServerErrorCode; +import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import io.netty.buffer.CompositeByteBuf; + +/** + * This class utilizes distinct data structures to manage chunk data on the sender and receiver nodes. + * + * On the sender node, chunk data is stored in a Netty ByteBuf. A composite buffer is employed to combine + * chunk data and metadata into a single buffer, minimizing in-memory copying of file chunks. + * On the receiver node, chunk data is stored in a DataInputStream, allowing direct writing + * to disk without the need for intermediate in-memory copying. + * + * This design eliminates the need for redundant byte buffer copies on both sender and receiver nodes, + * enhancing efficiency and reducing memory overhead. + */ +public class FileCopyGetChunkResponse2 extends Response { + private final PartitionId partitionId; + private final String fileName; + protected DataInputStream chunkDataOnReceiverNode; + protected ByteBuf chunkDataOnSenderNode; + private final long startOffset; + private final long chunkSizeInBytes; + private final boolean isLastChunk; + private static final int File_Name_Field_Size_In_Bytes = 4; + public static final short File_Copy_Chunk_Response_Version_V1 = 1; + + static short CURRENT_VERSION = File_Copy_Chunk_Response_Version_V1; + + public FileCopyGetChunkResponse2(short versionId, int correlationId, String clientId, ServerErrorCode errorCode, + PartitionId partitionId, String fileName, DataInputStream chunkDataOnReceiverNode, ByteBuf chunkDataOnSenderNode, + long startOffset, long chunkSizeInBytes, boolean isLastChunk) { + super(RequestOrResponseType.FileCopyGetChunkResponse, versionId, correlationId, clientId, errorCode); + + validateVersion(versionId); + + this.partitionId = partitionId; + this.fileName = fileName; + this.chunkDataOnReceiverNode = chunkDataOnReceiverNode; + this.chunkDataOnSenderNode = chunkDataOnSenderNode; + this.startOffset = startOffset; + this.chunkSizeInBytes = chunkSizeInBytes; + this.isLastChunk = isLastChunk; + } + + public FileCopyGetChunkResponse2(int correlationId, String clientId, ServerErrorCode errorCode) { + this(CURRENT_VERSION, correlationId, clientId, errorCode, + null, null, null, null, + -1, -1, false); + } + + public static FileCopyGetChunkResponse2 readFrom(DataInputStream stream, ClusterMap clusterMap) throws IOException { + RequestOrResponseType type = RequestOrResponseType.values()[stream.readShort()]; + if (type != RequestOrResponseType.FileCopyGetChunkResponse) { + throw new IllegalArgumentException("The type of request response is not compatible"); + } + short versionId = stream.readShort(); + int correlationId = stream.readInt(); + String clientId = Utils.readIntString(stream); + ServerErrorCode errorCode = ServerErrorCode.values()[stream.readShort()]; + + if(errorCode != ServerErrorCode.No_Error){ + // Setting the partitionId and fileName to null as there are no logfiles to be read. + // Setting the startOffset and sizeInBytes to -1 as there are no logfiles to be read. + return new FileCopyGetChunkResponse2(versionId, correlationId, clientId, errorCode, + null, null, null, null, + -1, -1, false); + } + PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream); + String fileName = Utils.readIntString(stream); + long startOffset = stream.readLong(); + long sizeInBytes = stream.readLong(); + boolean isLastChunk = stream.readBoolean(); + return new FileCopyGetChunkResponse2(versionId, correlationId, clientId, errorCode, partitionId, fileName, stream, null, startOffset, sizeInBytes, isLastChunk); + } + + public long sizeInBytes() { + return super.sizeInBytes() + partitionId.getBytes().length + File_Name_Field_Size_In_Bytes + + fileName.length() + Long.BYTES + Long.BYTES + 1 + Integer.BYTES + chunkDataOnSenderNode.readableBytes(); + } + + public void prepareBuffer(){ + CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(2); + super.prepareBuffer(); + bufferToSend.writeBytes(partitionId.getBytes()); + Utils.serializeString(bufferToSend, fileName, Charset.defaultCharset()); + bufferToSend.writeLong(startOffset); + bufferToSend.writeLong(chunkSizeInBytes); + bufferToSend.writeBoolean(isLastChunk); + bufferToSend.writeInt(chunkDataOnSenderNode.readableBytes()); + compositeByteBuf.addComponent(true, bufferToSend); + if(chunkSizeInBytes > 0 ) { + compositeByteBuf.addComponent(true, chunkDataOnSenderNode); + } + bufferToSend = compositeByteBuf; + } + + public PartitionId getPartitionId() { + return partitionId; + } + + public String getFileName() { + return fileName; + } + + public long getStartOffset() { + return startOffset; + } + + public long getChunkSizeInBytes() { + return chunkSizeInBytes; + } + + public boolean isLastChunk() { + return isLastChunk; + } + + public DataInputStream getChunkDataOnReceiverNode() { + return chunkDataOnReceiverNode; + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("FileCopyGetChunkResponse[") + .append("PartitionId=").append(partitionId.getId()) + .append(", FileName=").append(fileName) + .append(", SizeInBytes=").append(sizeInBytes()) + .append("]"); + return sb.toString(); + } + + private static void validateVersion(short version) { + if (version != CURRENT_VERSION) { + throw new IllegalArgumentException("Unknown version for FileCopyGetChunkResponse: " + version); + } + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java index ee87c3b236..20d4bf77f5 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java @@ -19,19 +19,23 @@ import java.io.DataInputStream; import java.io.IOException; import java.nio.charset.Charset; +import javax.annotation.Nonnull; + public class FileCopyGetMetaDataRequest extends RequestOrResponse{ - private PartitionId partitionId; - private String hostName; - private static final short File_Metadata_Request_Version_V1 = 1; + private final PartitionId partitionId; + private final String hostName; + public static final short File_Metadata_Request_Version_V1 = 1; private static final int HostName_Field_Size_In_Bytes = 4; - public FileCopyGetMetaDataRequest(short versionId, int correlationId, String clientId, - PartitionId partitionId, String hostName) { + public FileCopyGetMetaDataRequest( + short versionId, + int correlationId, + String clientId, + @Nonnull PartitionId partitionId, + @Nonnull String hostName) { super(RequestOrResponseType.FileCopyGetMetaDataRequest, versionId, correlationId, clientId); - if (partitionId == null) { - throw new IllegalArgumentException("Partition cannot be null"); - } + if (hostName.isEmpty()){ throw new IllegalArgumentException("Host Name cannot be null"); } @@ -47,20 +51,27 @@ public PartitionId getPartitionId() { return partitionId; } - protected static FileCopyGetMetaDataRequest readFrom(DataInputStream stream, ClusterMap clusterMap) throws IOException { - Short versionId = stream.readShort(); + public static FileCopyGetMetaDataRequest readFrom( + @Nonnull DataInputStream stream, + @Nonnull ClusterMap clusterMap) throws IOException { + short versionId = stream.readShort(); validateVersion(versionId); + int correlationId = stream.readInt(); String clientId = Utils.readIntString(stream); String hostName = Utils.readIntString(stream); PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream); + return new FileCopyGetMetaDataRequest(versionId, correlationId, clientId, partitionId, hostName); } public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("FileMetaDataRequest[").append("PartitionId=").append(partitionId).append(", HostName=").append(hostName) - .append("]"); + sb.append("FileCopyGetMetaDataRequest[") + .append("PartitionId=") + .append(partitionId.getId()).append(", HostName=") + .append(hostName) + .append("]"); return sb.toString(); } diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java index d8ec80fdeb..f11d275c71 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java @@ -19,23 +19,33 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import javax.annotation.Nonnull; public class FileCopyGetMetaDataResponse extends Response { private final int numberOfLogfiles; private final List logInfos; - private static final short File_Copy_Protocol_Metadata_Response_Version_V1 = 1; + public static final short File_Copy_Protocol_Metadata_Response_Version_V1 = 1; + + static short CURRENT_VERSION = File_Copy_Protocol_Metadata_Response_Version_V1; public FileCopyGetMetaDataResponse(short versionId, int correlationId, String clientId, int numberOfLogfiles, List logInfos, ServerErrorCode errorCode) { super(RequestOrResponseType.FileCopyGetMetaDataResponse, versionId, correlationId, clientId, errorCode); + validateVersion(versionId); this.numberOfLogfiles = numberOfLogfiles; this.logInfos = logInfos; } - public static FileCopyGetMetaDataResponse readFrom(DataInputStream stream) throws IOException { + public FileCopyGetMetaDataResponse(int correlationId, String clientId, ServerErrorCode serverErrorCode) { + this(CURRENT_VERSION, correlationId, clientId, 0, new ArrayList<>(), serverErrorCode); + } + + public static FileCopyGetMetaDataResponse readFrom( + @Nonnull DataInputStream stream) throws IOException { RequestOrResponseType type = RequestOrResponseType.values()[stream.readShort()]; if (type != RequestOrResponseType.FileCopyGetMetaDataResponse) { throw new IllegalArgumentException("The type of request response is not compatible. Expected : {}, Actual : {}" + @@ -72,8 +82,12 @@ public long sizeInBytes() { public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("FileMetaDataResponse[NumberOfLogfiles=").append(numberOfLogfiles).append(", logInfoList").append( - logInfos.toString()).append("]"); + sb + .append("FileCopyGetMetaDataResponse[NumberOfLogfiles=") + .append(numberOfLogfiles) + .append(", logInfoList") + .append(logInfos.toString()) + .append("]"); return sb.toString(); } @@ -82,7 +96,7 @@ public int getNumberOfLogfiles() { } public List getLogInfos() { - return logInfos; + return Collections.unmodifiableList(logInfos); } static void validateVersion(short version) { diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java index 020a8348f4..e6e1991591 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java @@ -18,6 +18,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.nio.charset.Charset; +import javax.annotation.Nonnull; /** @@ -25,15 +26,15 @@ * by LogInfo as part of filecopy metadata request. */ public class FileInfo { - private String fileName; - private long fileSizeInBytes; + private final String fileName; + private final long fileSizeInBytes; private static final int FileName_Field_Size_In_Bytes = 4; - private static final int FileSize_Field_Size_In_Bytes = 8; - - public FileInfo(String fileName, long fileSize) { + public FileInfo( + @Nonnull String fileName, + @Nonnull long fileSize) { this.fileName = fileName; this.fileSizeInBytes = fileSize; } @@ -41,12 +42,14 @@ public FileInfo(String fileName, long fileSize) { public long sizeInBytes() { return FileName_Field_Size_In_Bytes + fileName.length() + FileSize_Field_Size_In_Bytes; } - public static FileInfo readFrom(DataInputStream stream) throws IOException { + + public static FileInfo readFrom(@Nonnull DataInputStream stream) throws IOException { String fileName = Utils.readIntString(stream); long fileSize = stream.readLong(); return new FileInfo(fileName, fileSize); } - public void writeTo(ByteBuf buf) { + + public void writeTo(@Nonnull ByteBuf buf) { Utils.serializeString(buf, fileName, Charset.defaultCharset()); buf.writeLong(fileSizeInBytes); } diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java index fb8d131a9c..b1fc05a02f 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java @@ -19,7 +19,9 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import javax.annotation.Nonnull; /** @@ -27,16 +29,25 @@ * by filecopy metadata request. */ public class LogInfo { - private String fileName; - private long fileSizeInBytes; - List indexFiles; - List bloomFilters; + // TODO: Replace these fields with FileInfo + // private FileInfo fileInfo; + private final String fileName; + private final long fileSizeInBytes; + private final List indexFiles; + private final List bloomFilters; + + // TODO: Add isSealed prop + // private final boolean isSealed; private static final int FileName_Field_Size_In_Bytes = 4; private static final int FileSize_Field_Size_In_Bytes = 8; - private static final int ListSize_In_Bytes = 4; - public LogInfo(String fileName, long fileSizeInBytes, List indexFiles, List bloomFilters) { + + public LogInfo( + @Nonnull String fileName, + long fileSizeInBytes, + @Nonnull List indexFiles, + @Nonnull List bloomFilters) { this.fileName = fileName; this.fileSizeInBytes = fileSizeInBytes; this.indexFiles = indexFiles; @@ -52,11 +63,11 @@ public long getFileSizeInBytes() { } public List getBloomFilters() { - return bloomFilters; + return Collections.unmodifiableList(bloomFilters); } public List getIndexFiles() { - return indexFiles; + return Collections.unmodifiableList(indexFiles); } public long sizeInBytes() { @@ -71,7 +82,7 @@ public long sizeInBytes() { return size; } - public static LogInfo readFrom(DataInputStream stream) throws IOException { + public static LogInfo readFrom(@Nonnull DataInputStream stream) throws IOException { String fileName = Utils.readIntString(stream ); long fileSize = stream.readLong(); List listOfIndexFiles = new ArrayList<>(); @@ -89,7 +100,7 @@ public static LogInfo readFrom(DataInputStream stream) throws IOException { return new LogInfo(fileName, fileSize, listOfIndexFiles, listOfBloomFilters); } - public void writeTo(ByteBuf buf){ + public void writeTo(@Nonnull ByteBuf buf){ Utils.serializeString(buf, fileName, Charset.defaultCharset()); buf.writeLong(fileSizeInBytes); buf.writeInt(indexFiles.size()); @@ -105,15 +116,26 @@ public void writeTo(ByteBuf buf){ public String toString(){ StringBuilder sb = new StringBuilder(); sb.append("LogInfo["); - sb.append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes).append(","); - for(FileInfo fileInfo : indexFiles) { - sb.append(fileInfo.toString()); + sb.append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes); + + if(!indexFiles.isEmpty()) { + sb.append(", IndexFiles=["); + for (FileInfo fileInfo : indexFiles) { + sb.append(fileInfo.toString()); + } + sb.append("]"); + if(!bloomFilters.isEmpty()) { + sb.append(", "); + } } - for(FileInfo fileInfo: bloomFilters){ - sb.append(fileInfo.toString()); + if(!bloomFilters.isEmpty()){ + sb.append(" BloomFilters=["); + for(FileInfo fileInfo: bloomFilters){ + sb.append(fileInfo.toString()); + } + sb.append("]"); } sb.append("]"); return sb.toString(); } - -} +} \ No newline at end of file diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java index a3d5553abc..e34aadd75e 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java @@ -15,6 +15,8 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Reservoir; +import com.codahale.metrics.Snapshot; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.ambry.clustermap.ClusterMap; @@ -45,6 +47,7 @@ import com.github.ambry.network.RequestResponseChannel; import com.github.ambry.network.Send; import com.github.ambry.network.ServerNetworkResponseMetrics; +import com.github.ambry.network.ServerRequestResponseHelper; import com.github.ambry.notification.BlobReplicaSourceType; import com.github.ambry.notification.NotificationSystem; import com.github.ambry.notification.UpdateType; @@ -60,9 +63,15 @@ import com.github.ambry.protocol.CompositeSend; import com.github.ambry.protocol.DeleteRequest; import com.github.ambry.protocol.DeleteResponse; +import com.github.ambry.protocol.FileCopyGetChunkRequest; +import com.github.ambry.protocol.FileCopyGetChunkResponse; +import com.github.ambry.protocol.FileCopyGetMetaDataRequest; +import com.github.ambry.protocol.FileCopyGetMetaDataResponse; +import com.github.ambry.protocol.FileInfo; import com.github.ambry.protocol.GetOption; import com.github.ambry.protocol.GetRequest; import com.github.ambry.protocol.GetResponse; +import com.github.ambry.protocol.LogInfo; import com.github.ambry.protocol.PartitionRequestInfo; import com.github.ambry.protocol.PartitionResponseInfo; import com.github.ambry.protocol.PurgeRequest; @@ -87,6 +96,7 @@ import com.github.ambry.replication.FindToken; import com.github.ambry.replication.FindTokenHelper; import com.github.ambry.replication.ReplicationAPI; +import com.github.ambry.store.ChunkResponse; import com.github.ambry.store.FindInfo; import com.github.ambry.store.IdUndeletedStoreException; import com.github.ambry.store.Message; @@ -108,8 +118,11 @@ import com.github.ambry.utils.Pair; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.Unpooled; import java.io.DataInputStream; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -234,6 +247,12 @@ public void handleRequests(NetworkRequest networkRequest) throws InterruptedExce case ReplicateBlobRequest: handleReplicateBlobRequest(networkRequest); break; + case FileCopyGetMetaDataRequest: + handleFileCopyGetMetaDataRequest(networkRequest); + break; + case FileCopyGetChunkRequest: + handleFileCopyGetChunkRequest(networkRequest); + break; default: throw new UnsupportedOperationException("Request type not supported"); } @@ -1672,6 +1691,151 @@ public void handleUndeleteRequest(NetworkRequest request) throws IOException, In metrics.undeleteBlobTotalTimeInMs, null, null, totalTimeSpent)); } + /** + * Handler for FileMetadataRequest + */ + void handleFileCopyGetMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException { + FileCopyGetMetaDataRequest fileCopyGetMetaDataRequest = + FileCopyGetMetaDataRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); + logger.info("Demo: Received Request " + fileCopyGetMetaDataRequest); + + List logInfos; + try { + List logSegments = + storeManager.getLogSegmentMetadataFiles(fileCopyGetMetaDataRequest.getPartitionId(), true); + logInfos = convertStoreToProtocolLogInfo(logSegments); + } catch (Exception e) { + logger.error("Demo: Error while getting log segment metadata for partition {}", + fileCopyGetMetaDataRequest.getPartitionId().getId(), e); + FileCopyGetMetaDataResponse response = new FileCopyGetMetaDataResponse( + FileCopyGetMetaDataResponse.File_Copy_Protocol_Metadata_Response_Version_V1, + fileCopyGetMetaDataRequest.getCorrelationId(), fileCopyGetMetaDataRequest.getClientId(), + 0, new ArrayList<>(), ServerErrorCode.Unknown_Error); + requestResponseChannel.sendResponse(response, request, null); + return; + } + FileCopyGetMetaDataResponse response = new FileCopyGetMetaDataResponse( + FileCopyGetMetaDataResponse.File_Copy_Protocol_Metadata_Response_Version_V1, + fileCopyGetMetaDataRequest.getCorrelationId(), fileCopyGetMetaDataRequest.getClientId(), + logInfos.size(), logInfos, ServerErrorCode.No_Error); + logger.info("Demo: Sending Response " + response); + + // TODO: Add metrics for this operation + Histogram dummyHistogram = new Histogram(new Reservoir() { + @Override + public int size() { + return 0; + } + + @Override + public void update(long value) { + } + + @Override + public Snapshot getSnapshot() { + return null; + } + }); + ServerNetworkResponseMetrics serverNetworkResponseMetrics = new ServerNetworkResponseMetrics(dummyHistogram, + dummyHistogram, dummyHistogram, null, null, 0); + + requestResponseChannel.sendResponse(response, request, serverNetworkResponseMetrics); + } + + void handleFileCopyGetChunkRequest(NetworkRequest request) throws IOException, InterruptedException { + FileCopyGetChunkRequest fileCopyGetChunkRequest = + FileCopyGetChunkRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); + logger.info("Demo: Received Request " + fileCopyGetChunkRequest); + + ChunkResponse chunkResponse; + FileCopyGetChunkResponse response; + try { + chunkResponse = storeManager.getChunk( + fileCopyGetChunkRequest.getPartitionId(), fileCopyGetChunkRequest.getFileName(), + fileCopyGetChunkRequest.getChunkLengthInBytes(), fileCopyGetChunkRequest.getStartOffset()); + } catch (Exception e) { + logger.error("Demo: Error while getting chunk for partition {}", fileCopyGetChunkRequest.getPartitionId().getId(), e); + + response = new FileCopyGetChunkResponse( + FileCopyGetChunkResponse.File_Copy_Chunk_Response_Version_V1, + fileCopyGetChunkRequest.getCorrelationId(), fileCopyGetChunkRequest.getClientId(), + ServerErrorCode.Unknown_Error, fileCopyGetChunkRequest.getPartitionId(), + fileCopyGetChunkRequest.getFileName(), null, + fileCopyGetChunkRequest.getStartOffset(), fileCopyGetChunkRequest.sizeInBytes(), false); + + requestResponseChannel.sendResponse(response, request, null); + return; + } + response = new FileCopyGetChunkResponse( + FileCopyGetChunkResponse.File_Copy_Chunk_Response_Version_V1, + fileCopyGetChunkRequest.getCorrelationId(), fileCopyGetChunkRequest.getClientId(), + ServerErrorCode.No_Error, fileCopyGetChunkRequest.getPartitionId(), + fileCopyGetChunkRequest.getFileName(), chunkResponse.getStream(), + fileCopyGetChunkRequest.getStartOffset(), chunkResponse.getChunkLength(), false); + logger.info("Demo: Sending Response " + response); + + // TODO: Add metrics for this operation + Histogram dummyHistogram = new Histogram(new Reservoir() { + @Override + public int size() { + return 0; + } + + @Override + public void update(long value) { + } + + @Override + public Snapshot getSnapshot() { + return null; + } + }); + ServerNetworkResponseMetrics serverNetworkResponseMetrics = new ServerNetworkResponseMetrics(dummyHistogram, + dummyHistogram, dummyHistogram, null, null, 0); + + requestResponseChannel.sendResponse(response, request, serverNetworkResponseMetrics); + } + + private List convertStoreToProtocolLogInfo(List logSegments) { + List logInfos = new ArrayList<>(); + for (com.github.ambry.store.LogInfo logSegment : logSegments) { + List indexSegments = new ArrayList<>(); + logSegment.getIndexSegments().forEach(indexSegment -> { + indexSegments.add(new FileInfo(indexSegment.getFileName(), indexSegment.getFileSize())); + }); + + List bloomFilters = new ArrayList<>(); + logSegment.getBloomFilters().forEach(bloomFilter -> { + bloomFilters.add(new FileInfo(bloomFilter.getFileName(), bloomFilter.getFileSize())); + }); + + logInfos.add(new LogInfo( + logSegment.getLogSegment().getFileName(), logSegment.getLogSegment().getFileSize(), + indexSegments, bloomFilters)); + } + return logInfos; + } + + public static List convertProtocolToStoreLogInfo(List protocolLogInfos) { + List logInfos = new ArrayList<>(); + for (LogInfo protocolLogInfo : protocolLogInfos) { + List indexSegments = new ArrayList<>(); + protocolLogInfo.getIndexFiles().forEach(indexSegment -> { + indexSegments.add(new com.github.ambry.store.FileInfo(indexSegment.getFileName(), indexSegment.getFileSizeInBytes())); + }); + + List bloomFilters = new ArrayList<>(); + protocolLogInfo.getBloomFilters().forEach(bloomFilter -> { + bloomFilters.add(new com.github.ambry.store.FileInfo(bloomFilter.getFileName(), bloomFilter.getFileSizeInBytes())); + }); + + logInfos.add(new com.github.ambry.store.LogInfo( + new com.github.ambry.store.FileInfo(protocolLogInfo.getFileName(), protocolLogInfo.getFileSizeInBytes()), + indexSegments, bloomFilters)); + } + return logInfos; + } + /** * Get the formatted messages which needs to be written to Store. * @param receivedRequest received Put Request diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java index 428bb55af2..37dcdd6e7e 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java @@ -23,7 +23,6 @@ import com.github.ambry.cloud.BackupIntegrityMonitor; import com.github.ambry.cloud.RecoveryManager; import com.github.ambry.cloud.RecoveryNetworkClientFactory; -import com.github.ambry.clustermap.AmbryDataNode; import com.github.ambry.clustermap.AmbryServerDataNode; import com.github.ambry.clustermap.ClusterAgentsFactory; import com.github.ambry.clustermap.ClusterMap; @@ -31,18 +30,21 @@ import com.github.ambry.clustermap.CompositeClusterManager; import com.github.ambry.clustermap.DataNodeId; import com.github.ambry.clustermap.HelixClusterManager; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.clustermap.StaticClusterManager; import com.github.ambry.clustermap.VcrClusterAgentsFactory; +import com.github.ambry.commons.BlobId; import com.github.ambry.commons.Callback; import com.github.ambry.commons.LoggingNotificationSystem; import com.github.ambry.commons.NettyInternalMetrics; import com.github.ambry.commons.NettySslHttp2Factory; import com.github.ambry.commons.SSLFactory; import com.github.ambry.commons.ServerMetrics; -import com.github.ambry.config.CloudConfig; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.ConnectionPoolConfig; import com.github.ambry.config.DiskManagerConfig; +import com.github.ambry.config.FileCopyConfig; import com.github.ambry.config.Http2ClientConfig; import com.github.ambry.config.NettyConfig; import com.github.ambry.config.NetworkConfig; @@ -55,8 +57,11 @@ import com.github.ambry.config.VerifiableProperties; import com.github.ambry.messageformat.BlobStoreHardDelete; import com.github.ambry.messageformat.BlobStoreRecovery; +import com.github.ambry.messageformat.MessageFormatFlags; import com.github.ambry.network.BlockingChannelConnectionPool; +import com.github.ambry.network.ConnectedChannel; import com.github.ambry.network.ConnectionPool; +import com.github.ambry.network.ConnectionPoolTimeoutException; import com.github.ambry.network.LocalNetworkClientFactory; import com.github.ambry.network.LocalRequestResponseChannel; import com.github.ambry.network.NettyServerRequestResponseChannel; @@ -73,6 +78,10 @@ import com.github.ambry.network.http2.Http2NetworkClientFactory; import com.github.ambry.network.http2.Http2ServerMetrics; import com.github.ambry.notification.NotificationSystem; +import com.github.ambry.protocol.GetOption; +import com.github.ambry.protocol.GetRequest; +import com.github.ambry.protocol.GetResponse; +import com.github.ambry.protocol.PartitionRequestInfo; import com.github.ambry.protocol.RequestHandlerPool; import com.github.ambry.repair.RepairRequestsDb; import com.github.ambry.repair.RepairRequestsDbFactory; @@ -86,17 +95,32 @@ import com.github.ambry.rest.ServerSecurityServiceFactory; import com.github.ambry.rest.StorageServerNettyFactory; import com.github.ambry.server.storagestats.AggregatedAccountStorageStats; +import com.github.ambry.store.FileInfo; +import com.github.ambry.store.FileStore; +import com.github.ambry.store.LogInfo; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.StorageManager; +import com.github.ambry.store.StoreException; import com.github.ambry.store.StoreKeyConverterFactory; import com.github.ambry.store.StoreKeyFactory; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Time; import com.github.ambry.utils.Utils; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; @@ -104,7 +128,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Predicate; -import org.apache.logging.log4j.core.util.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,6 +177,9 @@ public class AmbryServer { private Thread repairThread = null; private RepairRequestsDb repairRequestsDb = null; private BackupIntegrityMonitor backupIntegrityMonitor = null; + private DataNodeId nodeId; + private FileStore fileStore; + public AmbryServer(VerifiableProperties properties, ClusterAgentsFactory clusterAgentsFactory, VcrClusterAgentsFactory vcrClusterAgentsFactory, Time time) throws InstantiationException { this(properties, clusterAgentsFactory, vcrClusterAgentsFactory, new LoggingNotificationSystem(), time, null); @@ -246,6 +272,7 @@ public void startup() throws InstantiationException { */ StoreKeyFactory storeKeyFactory = Utils.getObj(storeConfig.storeKeyFactory, clusterMap); DataNodeId nodeId = clusterMap.getDataNodeId(networkConfig.hostName, networkConfig.port); + this.nodeId = nodeId; if (nodeId == null) { throw new IllegalArgumentException(String.format("Node %s absent in cluster-map", networkConfig.hostName)); } @@ -314,6 +341,7 @@ public void startup() throws InstantiationException { scheduler = Utils.newScheduler(serverConfig.serverSchedulerNumOfthreads, false); logger.info("checking if node exists in clustermap host {} port {}", networkConfig.hostName, networkConfig.port); DataNodeId nodeId = clusterMap.getDataNodeId(networkConfig.hostName, networkConfig.port); + this.nodeId = nodeId; if (nodeId == null) { throw new IllegalArgumentException("The node " + networkConfig.hostName + ":" + networkConfig.port + "is not present in the clustermap. Failing to start the datanode"); @@ -478,12 +506,271 @@ public void startup() throws InstantiationException { long processingTime = SystemTime.getInstance().milliseconds() - startTime; metrics.serverStartTimeInMs.update(processingTime); logger.info("Server startup time in Ms {}", processingTime); + + FileCopyConfig fileCopyConfig = new FileCopyConfig(properties); + this.fileStore = new FileStore("dataDir", fileCopyConfig); + fileStore.start(); + + long startTimeMs = System.currentTimeMillis(); + testE2EFlow(); + logger.info("Demo: E2E flow took {} ms", System.currentTimeMillis() - startTimeMs); + +// testFileChunkAggregationForFileCopy(); +// testStateBuildPostFileCopy(); +// testFileStoreUtils(); } catch (Exception e) { logger.error("Error during startup", e); throw new InstantiationException("failure during startup " + e); } } + private void testE2EFlow() { + Optional optional = storageManager.getLocalPartitions().stream().filter(p -> p.getId() == 146).findFirst(); + PartitionId partitionId; + if (optional.isPresent()) { + partitionId = optional.get(); + } else { + logger.info("Demo: Partition not found"); + return; + } + Optional targetReplica = partitionId.getReplicaIds().stream().filter(replicaId -> + replicaId.getDataNodeId().getHostname().equals("ltx1-app3645.stg.linkedin.com")).findFirst(); + ReplicaId targetReplicaId; + if (targetReplica.isPresent()) { + targetReplicaId = targetReplica.get(); + } else { + logger.info("Demo: Target Replica not found"); + return; + } + Optional sourceReplica = partitionId.getReplicaIds().stream().filter(replicaId1 -> + replicaId1.getDataNodeId().getHostname().equals("ltx1-app3602.stg.linkedin.com")).findFirst(); + ReplicaId sourceReplicaId; + if (sourceReplica.isPresent()) { + sourceReplicaId = sourceReplica.get(); + } else { + logger.info("Demo: Source Replica not found"); + return; + } + + if (nodeId.getHostname().equals("ltx1-app3602.stg.linkedin.com")) { + logger.info("Demo: Source host. Initiating file copy based bootstrap..."); + + File directory = new File(sourceReplicaId.getReplicaPath()); + if (directory.exists() && directory.isDirectory()) { + for (File file : directory.listFiles()) { + if (file.isFile()) { + file.delete(); + } + } + logger.info("Demo: All files deleted."); + } else { + logger.info("Demo: Directory does not exist."); + } + try { + long startTimeMs = System.currentTimeMillis(); + new FileCopyHandler(connectionPool, fileStore, clusterMap).copy(partitionId, sourceReplicaId, targetReplicaId); + logger.info("Demo: FileCopyHandler took {} ms", System.currentTimeMillis() - startTimeMs); + + startTimeMs = System.currentTimeMillis(); + storageManager.buildStateForFileCopy(sourceReplicaId); + logger.info("Demo: buildStateForFileCopy took {} ms", System.currentTimeMillis() - startTimeMs); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else if (nodeId.getHostname().equals("ltx1-app3679.stg.linkedin.com")) { + logger.info("Demo: Tester host. Making GetBlob request..."); + while (true) { + try { + ConnectedChannel connectedChannel = + connectionPool.checkOutConnection(sourceReplicaId.getDataNodeId().getHostname(), sourceReplicaId.getDataNodeId().getPortToConnectTo(), 99999); + + long startTimeMs = System.currentTimeMillis(); + GetResponse response = getBlob(new BlobId("AAYAAgBoAAMAAQAAAAAAAACSsveRrYTJQfW46D8DJ6T0Pw", clusterMap), connectedChannel); + logger.info("Demo: TestGetBlob took {} ms", System.currentTimeMillis() - startTimeMs); + + logger.info("Demo: TestGetBlob response: {}", response); + response.getPartitionResponseInfoList().forEach(partitionResponseInfo -> { + partitionResponseInfo.getMessageInfoList().forEach(messageInfo -> { + logger.info("Demo: TestGetBlob message: {}", messageInfo); + }); + }); + if (response.getError().equals(ServerErrorCode.No_Error)) { + break; + } + } catch (Exception e) { + logger.error("Demo: TestGetBlob: Exception occurred", e); + } finally { + try { + Thread.sleep(5000); + } catch (InterruptedException e) {} + } + } + } + } + + private void testFileStoreUtils() throws StoreException, IOException { + // Testing FileStore utils + FileCopyConfig fileCopyConfig = new FileCopyConfig(properties); + FileStore fileStore = new FileStore("test", fileCopyConfig); + fileStore.start(); + List logInfoList = Collections.singletonList(new LogInfo(new FileInfo("0_log", 20000L), + Collections.singletonList(new FileInfo("0_index", 100L)), + Collections.singletonList(new FileInfo("0_bloom", 50L)))); + System.out.println("Persisting metadata" + logInfoList + " to file"); + fileStore.persistMetaDataToFile("/tmp/0/", logInfoList); + System.out.println("Reading metadata" + fileStore.readMetaDataFromFile("/tmp/0/") + " from file"); + String chunkPath = "/tmp/0/test_chunk"; + try (FileInputStream inputStream = new FileInputStream(chunkPath)) { + System.out.println("Trying putChunkToFile for chunk at " + chunkPath); + fileStore.putChunkToFile("/tmp/0/0_log", inputStream); + } catch (IOException e) { + System.err.println("An error occurred: " + e.getMessage()); + } + + int offset = 10; + int size = 10; + ByteBuffer byteBuffer = fileStore.readChunkForFileCopy("/tmp/0/", "0_log", offset, size); + System.out.println("Parsed log file contents read for offset=" + offset + ", size=" + size + " is: " + StandardCharsets.UTF_8.decode(byteBuffer)); + } + + private void testFileChunkAggregationForFileCopy() throws IOException { + String chunkPath = "/tmp/0/test_chunk"; // The path to the chunk file + String logFilePath = "/tmp/0/0_log"; // The path to the log file where chunks are written + String outputFilePath = "/tmp/0/output_log_copy"; // New file where the log data will be copied + + System.out.println("Testing file chunk aggregation for filecopy with Filestore"); + + int numChunksToWrite = 10; // Number of times the chunk should be written to the log file + int chunkSize = (int)Files.size(Paths.get(chunkPath)); // Size of the chunk + + Path path = Paths.get(logFilePath); + if (Files.exists(path)) { + // If the file exists, delete it + System.out.println("File exists. Deleting the file: " + logFilePath); + Files.delete(path); // Delete the existing file + } + System.out.println("Creating a new file: " + logFilePath); + Files.createFile(path); // Create a new file + + // Step 1: Write the chunk to the logFilePath multiple times + for (int i = 0; i < numChunksToWrite; i++) { + try (FileInputStream inputStream = new FileInputStream(chunkPath)) { + System.out.println("Trying to put chunk to file for chunk at " + chunkPath); + // Assuming fileStore.putChunkToFile() writes data from the input stream to the log file + fileStore.putChunkToFile(logFilePath, inputStream); + System.out.println("Written chunk " + (i + 1) + " to " + logFilePath); + } catch (IOException e) { + System.err.println("An error occurred while reading or writing the chunk: " + e.getMessage()); + } + } + + + // Step 2: Read from logFilePath chunk by chunk and write to a new file in the same directory + int offset = 0; + try (FileInputStream logInputStream = new FileInputStream(logFilePath); + FileOutputStream outputStream = new FileOutputStream(outputFilePath)) { + + byte[] buffer = new byte[chunkSize]; + int bytesRead; + while ((bytesRead = logInputStream.read(buffer)) != -1) { + // Write the chunk to the new file + outputStream.write(buffer, 0, bytesRead); + offset += bytesRead; + + System.out.println("Writing chunk to new file at offset " + offset); + } + + // Verify if contents of both files are same + byte[] content1 = Files.readAllBytes(Paths.get(logFilePath)); + byte[] content2 = Files.readAllBytes(Paths.get(outputFilePath)); + // Compare the byte arrays + if (Arrays.equals(content1, content2)) { + System.out.println("Input and output files are identical."); + } else { + System.out.println("Input and output files differ."); + } + System.out.println("File copy completed. Data written to " + outputFilePath); + } catch (IOException e) { + System.err.println("An error occurred while reading or writing the log file: " + e.getMessage()); + } + } + + + private void testStateBuildPostFileCopy() throws IOException, ConnectionPoolTimeoutException, InterruptedException { + String partitionName = "803"; + String logFilePath = "/tmp/803/14_0_log"; // The path to the log file where chunks are written + String outputFilePath = "/tmp/803/15_0_log"; // New file where the log data will be copied + int chunkSize = 100*1024*1024; // Size of the chunk + System.out.println("Testing state build post filecopy for partitionId " + partitionName); + + // Step 2: Read from logFilePath chunk by chunk and write to a new file in the same directory + int offset = 0; + try (FileInputStream logInputStream = new FileInputStream(logFilePath); + FileOutputStream outputStream = new FileOutputStream(outputFilePath)) { + + byte[] buffer = new byte[chunkSize]; + int bytesRead; + while ((bytesRead = logInputStream.read(buffer)) != -1) { + // Write the chunk to the new file + outputStream.write(buffer, 0, bytesRead); + offset += bytesRead; + + System.out.println("Writing chunk to new file at offset " + offset); + } + + // Verify if contents of both files are same + byte[] content1 = Files.readAllBytes(Paths.get(logFilePath)); + byte[] content2 = Files.readAllBytes(Paths.get(outputFilePath)); + // Compare the byte arrays + if (Arrays.equals(content1, content2)) { + System.out.println("Input and output files are identical."); + } else { + System.out.println("Input and output files differ."); + } + System.out.println("File copy completed. Data written to " + outputFilePath); + } catch (IOException e) { + System.err.println("An error occurred while reading or writing the log file: " + e.getMessage()); + } + + // Run state build on the aggregated output file by replacing original log file with copied file to see if the + // state is built correctly post filecopy + Files.delete(Paths.get(logFilePath)); + Files.move(Paths.get(outputFilePath), Paths.get(logFilePath)); + + System.out.println("Renamed log file: " + outputFilePath + " to " + logFilePath ); + + storageManager.buildStateForFileCopy(storageManager.getReplica(partitionName)); + System.out.println("State build successfully for partitionId: " + partitionName); + + // Perform getBlob operations on few blobs to verify is state is built correctly. + + List blobIdList = new ArrayList<>(2); + blobIdList.add(new BlobId("AAYQAgZEAAgAAQAAAAAAAAMja1b_H6RbSG2fzSHaZem-SA", clusterMap)); + blobIdList.add(new BlobId("AAYQAgZEAAgAAQAAAAAAAAMj48QxOzSxRoKbgGiP59OZFw", clusterMap)); + ConnectedChannel connectedChannel = + connectionPool.checkOutConnection("localhost", new Port(clusterMap.getDataNodeIds().get(0).getPort(), PortType.PLAINTEXT), + 5000); + for (BlobId blobId : blobIdList) { + System.out.println("Trying getBlob operation for blobId: " + blobId.getID()); + GetResponse getResponse = getBlob(blobId, connectedChannel); + System.out.println("BlobId: " + blobId.getID() + " found with GetResponse: " + getResponse); + } + } + + + /** + * Fetches a single blob from ambry server node + * @param blobId the {@link BlobId} that needs to be fetched + * @param connectedChannel the {@link ConnectedChannel} to use to send and receive data + * @throws IOException + */ + GetResponse getBlob(BlobId blobId, ConnectedChannel connectedChannel) throws IOException { + List partitionRequestInfoList = new ArrayList<>(); + partitionRequestInfoList.add(new PartitionRequestInfo(blobId.getPartition(), Collections.singletonList(blobId))); + GetRequest getRequest = new GetRequest(1, "client1", MessageFormatFlags.All, partitionRequestInfoList, GetOption.Include_All); + return GetResponse.readFrom(connectedChannel.sendAndReceive(getRequest).getInputStream(), clusterMap); + } + /** * This method is expected to be called in the exit path as long as the AmbryServer instance construction was * successful. This is expected to be called even if {@link #startup()} did not succeed. diff --git a/ambry-server/src/main/java/com/github/ambry/server/FileCopyHandler.java b/ambry-server/src/main/java/com/github/ambry/server/FileCopyHandler.java new file mode 100644 index 0000000000..449ebeeb68 --- /dev/null +++ b/ambry-server/src/main/java/com/github/ambry/server/FileCopyHandler.java @@ -0,0 +1,169 @@ +package com.github.ambry.server; + +import com.codahale.metrics.MetricRegistry; +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.commons.NettySslHttp2Factory; +import com.github.ambry.commons.SSLFactory; +import com.github.ambry.config.ClusterMapConfig; +import com.github.ambry.config.ConnectionPoolConfig; +import com.github.ambry.config.FileCopyConfig; +import com.github.ambry.config.Http2ClientConfig; +import com.github.ambry.config.SSLConfig; +import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.network.BlockingChannelConnectionPool; +import com.github.ambry.network.ChannelOutput; +import com.github.ambry.network.ConnectedChannel; +import com.github.ambry.network.ConnectionPool; +import com.github.ambry.network.ConnectionPoolTimeoutException; +import com.github.ambry.network.http2.Http2BlockingChannelPool; +import com.github.ambry.network.http2.Http2ClientMetrics; +import com.github.ambry.protocol.FileCopyGetChunkRequest; +import com.github.ambry.protocol.FileCopyGetChunkResponse; +import com.github.ambry.protocol.FileCopyGetMetaDataRequest; +import com.github.ambry.protocol.FileCopyGetMetaDataResponse; +import com.github.ambry.protocol.FileInfo; +import com.github.ambry.store.FileStore; +import com.github.ambry.store.LogInfo; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class FileCopyHandler { + private final ConnectionPool connectionPool; + private final FileStore fileStore; + private final ClusterMap clusterMap; + private final int CHUNK_SIZE = 20 * 1024 * 1024; // 20MB + + private static final Logger logger = LoggerFactory.getLogger(FileCopyHandler.class); + + public FileCopyHandler(ConnectionPool connectionPool, FileStore fileStore, ClusterMap clusterMap) { + this.clusterMap = clusterMap; + this.fileStore = fileStore; + this.connectionPool = connectionPool; + } + + // TODO fix this ctor + // Fails with :- java.lang.NoClassDefFoundError: com/github/ambry/server/FileCopyUtils + public FileCopyHandler(VerifiableProperties properties, ClusterMap clusterMap) throws Exception { + this.clusterMap = clusterMap; + ClusterMapConfig clusterMapConfig = new ClusterMapConfig(properties); + ConnectionPoolConfig connectionPoolConfig = new ConnectionPoolConfig(properties); + MetricRegistry registry = clusterMap.getMetricRegistry(); + SSLConfig sslConfig = new SSLConfig(properties); + SSLFactory sslHttp2Factory = new NettySslHttp2Factory(sslConfig); + + if (clusterMapConfig.clusterMapEnableHttp2Replication) { + Http2ClientMetrics http2ClientMetrics = new Http2ClientMetrics(registry); + Http2ClientConfig http2ClientConfig = new Http2ClientConfig(properties); + connectionPool = new Http2BlockingChannelPool(sslHttp2Factory, http2ClientConfig, http2ClientMetrics); + } else { + connectionPool = new BlockingChannelConnectionPool(connectionPoolConfig, sslConfig, clusterMapConfig, registry); + } + FileCopyConfig fileCopyConfig = new FileCopyConfig(properties); + fileStore = new FileStore("dataDir", fileCopyConfig); + fileStore.start(); + } + + public void copy(PartitionId partitionId, ReplicaId sourceReplicaId, ReplicaId targetReplicaId) + throws IOException, ConnectionPoolTimeoutException, InterruptedException { + FileCopyGetMetaDataRequest request = new FileCopyGetMetaDataRequest( + FileCopyGetMetaDataRequest.File_Metadata_Request_Version_V1, 0, "", partitionId, "hostName"); + + logger.info("Demo: Request: {}", request); + long startTimeMs = System.currentTimeMillis(); + ConnectedChannel connectedChannel = + connectionPool.checkOutConnection(targetReplicaId.getDataNodeId().getHostname(), targetReplicaId.getDataNodeId().getPortToConnectTo(), 40); + ChannelOutput channelOutput = connectedChannel.sendAndReceive(request); + FileCopyGetMetaDataResponse response = FileCopyGetMetaDataResponse.readFrom(channelOutput.getInputStream()); + logger.info("Demo: FileCopyGetMetaDataRequest Api took {} ms", System.currentTimeMillis() - startTimeMs); + logger.info("Demo: Response: {}", response); + + List logInfos = AmbryRequests.convertProtocolToStoreLogInfo(response.getLogInfos()); + + String partitionFilePath = sourceReplicaId.getMountPath() + File.separator + partitionId.getId(); + fileStore.persistMetaDataToFile(partitionFilePath, logInfos); + + response.getLogInfos().forEach(logInfo -> { + logInfo.getIndexFiles().forEach(indexFile -> { + String filePath = partitionFilePath + "/" + indexFile.getFileName(); + try { + fetchAndPersistChunks(partitionId, targetReplicaId, clusterMap, indexFile, filePath, Long.MAX_VALUE, 0); + } catch (IOException | ConnectionPoolTimeoutException | InterruptedException e) { + throw new RuntimeException(e); + } + }); +// logInfo.getBloomFilters().forEach(bloomFile -> { +// String filePath = partitionFilePath + "/" + bloomFile.getFileName(); +// try { +// fetchAndPersistChunks(partitionId, targetReplicaId, clusterMap, bloomFile, filePath, Long.MAX_VALUE, 0); +// } catch (IOException | ConnectionPoolTimeoutException | InterruptedException e) { +// throw new RuntimeException(e); +// } +// }); + String filePath = partitionFilePath + "/" + logInfo.getFileName() + "_log"; + FileInfo logFileInfo = new FileInfo(logInfo.getFileName() + "_log", logInfo.getFileSizeInBytes()); + + int chunksInLogSegment = (int) Math.ceil((double) logFileInfo.getFileSizeInBytes() / CHUNK_SIZE); + logger.info("Demo: Total chunks in log segment: {}", chunksInLogSegment); + + for (int i = 0; i < chunksInLogSegment; i++) { + long startOffset = (long) i * CHUNK_SIZE; + long sizeInBytes = Math.min(CHUNK_SIZE, logFileInfo.getFileSizeInBytes() - startOffset); + logger.info("Demo: Fetching chunk {} for log segment: {} startOffset: {} sizeInBytes: {}", i+1, filePath, startOffset, sizeInBytes); + try { + fetchAndPersistChunks(partitionId, targetReplicaId, clusterMap, logFileInfo, filePath, sizeInBytes, startOffset); + } catch (IOException | ConnectionPoolTimeoutException | InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + } + + private void fetchAndPersistChunks(PartitionId partitionId, ReplicaId replicaId, ClusterMap clusterMap, + FileInfo fileInfo, String filePath, long sizeInBytes, long startOffset) + throws IOException, ConnectionPoolTimeoutException, InterruptedException { + FileCopyGetChunkRequest request = new FileCopyGetChunkRequest( + FileCopyGetChunkRequest.File_Chunk_Request_Version_V1, 0, "", partitionId, + fileInfo.getFileName(), startOffset, sizeInBytes); + + logger.info("Demo: Request: {}", request); + long startTimeMs = System.currentTimeMillis(); + ConnectedChannel connectedChannel = + connectionPool.checkOutConnection(replicaId.getDataNodeId().getHostname(), replicaId.getDataNodeId().getPortToConnectTo(), 99999); + ChannelOutput chunkChannelOutput = connectedChannel.sendAndReceive(request); + FileCopyGetChunkResponse response = FileCopyGetChunkResponse.readFrom(chunkChannelOutput.getInputStream(), clusterMap); + logger.info("Demo: FileCopyGetChunkRequest Api took {} ms", System.currentTimeMillis() - startTimeMs); + logger.info("Demo: Response: {}", response); + + putChunkToFile(filePath, response.getChunkStream(), response.getChunkSizeInBytes()); + } + + private void putChunkToFile(String filePath, DataInputStream stream, long chunkSizeInBytes) throws IOException { + long startTimeMs = System.currentTimeMillis(); + if(!new File(filePath).exists()) { + Files.createFile(new File(filePath).toPath()); + } + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + byte[] buffer = new byte[(int) chunkSizeInBytes]; + int bytesRead; + while ((bytesRead = stream.read(buffer)) != -1) { + byteArrayOutputStream.write(buffer, 0, bytesRead); + } + Files.write(Paths.get(filePath), buffer, StandardOpenOption.CREATE, StandardOpenOption.APPEND); + logger.info("Demo: putChunkToFile took {} ms", System.currentTimeMillis() - startTimeMs); + logger.info("Demo: Write successful for chunk to file: " + filePath); + +// FileInputStream fileInputStream = new FileInputStream(String.valueOf(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))); +// fileStore.putChunkToFile(filePath, fileInputStream); + } +} \ No newline at end of file diff --git a/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java b/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java index db08334f9a..f5c5413fee 100644 --- a/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java +++ b/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java @@ -64,6 +64,7 @@ import com.github.ambry.protocol.CatchupStatusAdminResponse; import com.github.ambry.protocol.DeleteRequest; import com.github.ambry.protocol.DeleteResponse; +import com.github.ambry.protocol.FileCopyGetMetaDataRequest; import com.github.ambry.protocol.GetOption; import com.github.ambry.protocol.GetRequest; import com.github.ambry.protocol.GetResponse; @@ -229,7 +230,8 @@ public AmbryServerRequestsTest(boolean validateRequestOnStoreState, boolean enab @Parameterized.Parameters public static List data() { - return Arrays.asList(new Object[][]{{false, false}, {true, false}, {false, true}, {true, true}}); +// return Arrays.asList(new Object[][]{{false, false}, {true, false}, {false, true}, {true, true}}); + return Arrays.asList(new Object[][]{{false, false}}); } private static Properties createProperties(boolean validateRequestOnStoreState, @@ -1645,6 +1647,17 @@ public void healthCheckTest() throws InterruptedException, StoreException, IOExc setPropertyToAmbryRequests(currentProperties, "disk.manager.disk.healthcheck.enabled", "false"); } + @Test + public void fileCopyGetMetaDataRequestTest() throws IOException, InterruptedException { + List partitionIds = clusterMap.getWritablePartitionIds(DEFAULT_PARTITION_CLASS); + + RequestOrResponse request = new com.github.ambry.protocol.FileCopyGetMetaDataRequest( + FileCopyGetMetaDataRequest.File_Metadata_Request_Version_V1, 0, "", + partitionIds.get(0), "hostName"); + + sendRequestGetResponse(request, ServerErrorCode.No_Error); + } + // helpers // general diff --git a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java index 3ef045aa82..b7e13b1dd9 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java +++ b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java @@ -15,6 +15,7 @@ import com.codahale.metrics.Timer; import com.github.ambry.account.AccountService; +import com.github.ambry.clustermap.FileStoreException; import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.clustermap.ReplicaSealStatus; import com.github.ambry.clustermap.ReplicaState; @@ -33,13 +34,20 @@ import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Time; import com.github.ambry.utils.Utils; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.RandomAccessFile; import java.io.SequenceInputStream; +import java.nio.ByteBuffer; +import java.nio.file.Files; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -56,6 +64,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1317,6 +1326,105 @@ public void shutdown() throws StoreException { shutdown(false); } + /** + * Gets the log segment metadata files from in-memory data structures + * This method returns List of LogSegmentFiles along with its IndexFiles, BloomFilterFiles + */ + List getLogSegmentMetadataFiles(boolean includeActiveLogSegment) { + List result = new ArrayList<>(); + + List sealedLogsAndMetaDataFiles = getLogSegments(includeActiveLogSegment); + if (null != sealedLogsAndMetaDataFiles) { + for (FileInfo E : sealedLogsAndMetaDataFiles) { + logger.info("[Dw] LS file: {} size: {}", E.getFileName(), E.getFileSize()); + + LogSegmentName logSegmentName = LogSegmentName.fromFilename(E.getFileName() + LogSegmentName.SUFFIX); + + List allIndexSegmentsForLogSegment = getAllIndexSegmentsForLogSegment(dataDir, logSegmentName); + if (null != allIndexSegmentsForLogSegment) { + for (FileInfo is : allIndexSegmentsForLogSegment) { + logger.info("[Dw] IS file: {} size: {}", is.getFileName(), is.getFileSize()); + } + } + List bloomFiltersForLogSegment = getAllBloomFiltersForLogSegment(dataDir, logSegmentName); + if (null != bloomFiltersForLogSegment) { + for (FileInfo bf : bloomFiltersForLogSegment) { + logger.info("[Dw] BF file: {} size: {}", bf.getFileName(), bf.getFileSize()); + } + } + result.add(new LogInfo(E, allIndexSegmentsForLogSegment, bloomFiltersForLogSegment)); + } + } + return result; + } + + /** + * Get all log segments in the store. + * Param includeActiveLogSegment is used to determine if the active log segment should be included in the result. + */ + List getLogSegments(boolean includeActiveLogSegment){ + return log.getAllLogSegmentNames().stream() + .filter(segment -> includeActiveLogSegment || !segment.equals(log.getActiveSegment().getName())) + .filter(segment -> !segment.isSingleSegment()) + .map(segment -> log.getSegment(segment)) + .map(segment -> new FileInfo(segment.getName().toString(), segment.getView().getFirst().length())) + .collect(Collectors.toList()); + } + + /** + * Get all index segments for a log segment. + */ + List getAllIndexSegmentsForLogSegment(String dataDir, LogSegmentName logSegmentName){ + return Arrays.stream(PersistentIndex.getIndexSegmentFilesForLogSegment(dataDir, logSegmentName)) + .map(file -> new FileInfo(file.getName(), file.length())) + .collect(Collectors.toList()); + } + + /** + * Get all bloom filter files for a log segment. + */ + List getAllBloomFiltersForLogSegment(String dataDir, LogSegmentName logSegmentName){ + return Arrays.stream(PersistentIndex.getBloomFilterFiles(dataDir, logSegmentName)) + .map(file -> new FileInfo(file.getName(), file.length())) + .collect(Collectors.toList()); + } + + public ChunkResponse getStreamForFile(String fileName, long sizeInBytes, long startOffset) throws IOException { + if (startOffset == 0 && sizeInBytes == Long.MAX_VALUE) { + return getStreamForFile(fileName); + } + final File file = validateAndGetFile(fileName); + + RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r"); + randomAccessFile.seek(startOffset); + ByteBuffer buf = ByteBuffer.allocate((int)sizeInBytes); + randomAccessFile.getChannel().read(buf); + buf.flip(); + + byte[] byteArray = new byte[buf.remaining()]; + buf.get(byteArray); + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArray); + + return new ChunkResponse(new DataInputStream(byteArrayInputStream), byteArray.length); + } + + private ChunkResponse getStreamForFile(String fileName) throws IOException { + final File file = validateAndGetFile(fileName); + return new ChunkResponse(new DataInputStream(Files.newInputStream(file.toPath())), file.length()); + } + + private File validateAndGetFile(String fileName) throws IOException { + String filePath = getDataDir() + File.separator + fileName; + File file = new File(filePath); + if (!file.exists()) { + throw new IOException("File doesn't exist: " + filePath); + } + if (!file.canRead()) { + throw new IOException("File cannot be read: " + filePath); + } + return file; + } + /** * Update the sealed status of the replica. */ diff --git a/ambry-store/src/main/java/com/github/ambry/store/BootstrapController.java b/ambry-store/src/main/java/com/github/ambry/store/BootstrapController.java index c0999d1013..83cc584b0d 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/BootstrapController.java +++ b/ambry-store/src/main/java/com/github/ambry/store/BootstrapController.java @@ -103,7 +103,7 @@ public void onPartitionBecomeBootstrapFromOffline(@Nonnull String partitionName) PartitionStateChangeListener listenerToInvoke = null; if (null == replica) { - if (isFileCopyFeatureEnabled()) { + if (isFileCopyFeatureEnabled() && partitionName == "") { // "New partition -> FC" // This is a new partition placement and FileCopy bootstrap protocol is enabled. listenerToInvoke = fileCopyManagerListener; diff --git a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java index d6412b0b31..576a8aa1f4 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java @@ -24,7 +24,9 @@ import com.github.ambry.utils.Throttler; import com.github.ambry.utils.Time; import com.github.ambry.utils.Utils; +import java.io.DataInputStream; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; @@ -41,6 +43,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -52,6 +55,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -188,6 +192,11 @@ void start(boolean shouldRemoveUnexpectedDirs) throws InterruptedException { ConcurrentHashMap startExceptions = new ConcurrentHashMap<>(); List startupThreads = new ArrayList<>(); for (final Map.Entry partitionAndStore : stores.entrySet()) { + if(Objects.equals(146L, partitionAndStore.getKey().getId()) && + partitionToReplicaMap.get(partitionAndStore.getKey()).getDataNodeId().getHostname().equals("ltx1-app3602.stg.linkedin.com")) { + logger.info("Demo: Skip the store {} at bootstrapping node", partitionAndStore.getKey()); + continue; + } if (stoppedReplicas.contains(partitionAndStore.getKey().toPathString())) { logger.info("Skip the store {} because it is on the stopped list", partitionAndStore.getKey()); continue; @@ -813,15 +822,42 @@ public DiskHealthStatus getDiskHealthStatus() { return diskHealthStatus; } - public boolean isFileExists(String fileName) { - String filePath = this.disk.getMountPath() + File.separator + fileName; + /** + * Checks if the file exists on the disk + * @param fileName + * @return + */ + public boolean isFileExists(PartitionId partitionId, String fileName) { + String filePath = this.disk.getMountPath() + "/" + partitionId.toPathString() + File.separator + fileName; return new File(filePath).exists(); } + /** + * Gets the files for the given pattern from the disk + */ public List getFilesForPattern(Pattern pattern) throws IOException { return Utils.getFilesForPattern(this.disk.getMountPath(), pattern); } + /** + * Gets the log segment metadata files from in-memory data structures + * This method returns List of LogSegmentFiles along with its IndexFiles, BloomFilterFiles + */ + List getLogSegmentMetadataFiles(PartitionId partitionId, boolean includeActiveLogSegment) { + if (!stores.containsKey(partitionId)) { + throw new IllegalArgumentException("BlobStore for partition " + partitionId + " is not found on disk " + disk); + } + return stores.get(partitionId).getLogSegmentMetadataFiles(includeActiveLogSegment); + } + + public ChunkResponse getStreamForFile(PartitionId partitionId, String fileName, long sizeInBytes, long startOffset) + throws IOException { + if (!stores.containsKey(partitionId)) { + throw new IllegalArgumentException("BlobStore for partition " + partitionId + " is not found on disk " + disk); + } + return stores.get(partitionId).getStreamForFile(fileName, sizeInBytes, startOffset); + } + /** Manages the disk healthchecking tasks such as creating,writing, reading, and deleting a file */ diff --git a/ambry-store/src/main/java/com/github/ambry/store/FileStore.java b/ambry-store/src/main/java/com/github/ambry/store/FileStore.java index d3f36dee98..799005739a 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/FileStore.java +++ b/ambry-store/src/main/java/com/github/ambry/store/FileStore.java @@ -1,46 +1,257 @@ -/** - * Copyright 2024 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ package com.github.ambry.store; -import java.nio.channels.FileChannel; +import com.github.ambry.clustermap.FileStoreException; +import com.github.ambry.config.FileCopyConfig; +import com.github.ambry.replication.FindToken; +import com.github.ambry.utils.CrcInputStream; +import com.github.ambry.utils.CrcOutputStream; +import com.github.ambry.utils.Pair; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import com.github.ambry.clustermap.FileStoreException.FileStoreErrorCode; +import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +public class FileStore { + private static final Logger logger = LoggerFactory.getLogger(FileStore.class); + private static boolean isRunning = false; + private final String dataDir; + private final FileMetadataSerde fileMetadataSerde; + private final FileCopyConfig fileCopyConfig; -/** - * This class is responsible for interactions with Disk as Part Of File Copy Protocol. - * It is responsible for reading and writing chunks and metadata to disk. - */ -class FileStore { - private boolean isRunning = false; - private ConcurrentHashMap fileNameToFileChannelMap; + public FileStore(String dataDir, FileCopyConfig fileCopyConfig){ + this.dataDir = dataDir; + this.fileMetadataSerde = new FileMetadataSerde(); + this.fileCopyConfig = fileCopyConfig; + } - public FileStore(String dataDir){ - fileNameToFileChannelMap = new ConcurrentHashMap<>(); + public void start() throws StoreException { + isRunning = true; + } + public boolean isRunning() { + return isRunning; + } + public void stop() { isRunning = false; } - void start() { - if(!isRunning) { - //Start the FileStore - isRunning = true; + + // TODO Moved to BlobStore as the bootstrapping node wouldn't have FileStore instantiated. + public ByteBuffer readChunkForFileCopy(String mountPath, String fileName, int offset, int size) + throws IOException { + if(!isRunning){ + throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure); } + String filePath = mountPath + "/" + fileName; + File file = new File(filePath); + RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r"); + randomAccessFile.seek(offset); + ByteBuffer buf = ByteBuffer.allocate(size); + randomAccessFile.getChannel().read(buf); + buf.flip(); + return buf; } - void stop() { - //TODO: Implement shutdown Hook. - isRunning = false; + + public void putChunkToFile(String outputFilePath, FileInputStream fileInputStream) + throws IOException { + if(!isRunning){ + throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure); + } + if(fileInputStream == null){ + throw new IllegalArgumentException("fileInputStream is null"); + } + // TODO: Handle edge cases and validations + + // Determine the size of the file + long fileSize = fileInputStream.available(); + + // Read all bytes from the source file and append them to the output file + + byte[] content = new byte[(int) fileSize]; // Read the content of the source file into a byte array + fileInputStream.read(content); // Read bytes into the array + Files.write(Paths.get(outputFilePath), content, StandardOpenOption.CREATE, StandardOpenOption.APPEND); + + System.out.println("Demo: Write successful for chunk to file: " + outputFilePath); } - boolean isRunning() { - return isRunning; + + public void persistMetaDataToFile(String mountPath, List logInfoList) throws IOException { + if(!isRunning){ + throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure); + } + if(logInfoList == null){ + throw new IllegalArgumentException("logInfoList is null"); + } + + File temp = new File(mountPath, fileCopyConfig.fileCopyMetaDataFileName + ".tmp"); + File actual = new File(mountPath, fileCopyConfig.fileCopyMetaDataFileName); + try { + FileOutputStream fileStream = new FileOutputStream(temp); + fileMetadataSerde.persist(logInfoList, fileStream); + System.out.println("FileCopyMetadata file serialized and written to file: " + actual.getAbsolutePath()); + // swap temp file with the original file + temp.renameTo(actual); + logger.debug("Completed writing remote tokens to file {}", actual.getAbsolutePath()); + } catch (IOException e) { + logger.error("IO error while persisting tokens to disk {}", temp.getAbsoluteFile()); + throw e; + } + } + + + public List readMetaDataFromFile(String mountPath) throws IOException { + List logInfoList = new ArrayList<>(); + if(!isRunning){ + throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure); + } + + File fileCopyMetaDataFile = new File(mountPath, fileCopyConfig.fileCopyMetaDataFileName); + if (!fileCopyMetaDataFile.exists()) { + logger.info("fileCopyMetaDataFile {} not found", fileCopyMetaDataFile.getAbsolutePath()); + return logInfoList; + } + try { + FileInputStream fileStream = new FileInputStream(fileCopyMetaDataFile); + System.out.println("Attempting reading from file: " + fileCopyMetaDataFile.getAbsolutePath()); + logInfoList = fileMetadataSerde.retrieve(fileStream); + return logInfoList; + } catch (IOException e) { + logger.error("IO error while reading filecopy metadata from disk {}", fileCopyMetaDataFile.getAbsoluteFile()); + throw e; + } + } + + public void shutdown(){ + return; + } + + /** + * Class to serialize and deserialize replica tokens + */ + private static class FileMetadataSerde { + private static final short Crc_Size = 8; + private static final short VERSION_0 = 0; + private static final short CURRENT_VERSION = VERSION_0; + + public FileMetadataSerde() { + } + + /** + * Serialize the remote tokens to the file + * @param logInfoList the mapping from the replicas to the remote tokens + * @param outputStream the file output stream to write to + */ + public void persist(List logInfoList, OutputStream outputStream) + throws IOException { + CrcOutputStream crcOutputStream = new CrcOutputStream(outputStream); + DataOutputStream writer = new DataOutputStream(crcOutputStream); + try { + + writer.writeInt(logInfoList.size()); + for (LogInfo logInfo : logInfoList) { + // write log segment size and name + writer.writeLong(logInfo.getLogSegment().getFileSize()); + writer.writeLong(logInfo.getLogSegment().getFileName().getBytes().length); + writer.write(logInfo.getLogSegment().getFileName().getBytes()); + writer.writeInt(logInfo.getIndexSegments().size()); + for(FileInfo fileInfo : logInfo.getIndexSegments()){ + writer.writeLong(fileInfo.getFileSize()); + writer.writeLong(fileInfo.getFileName().getBytes().length); + writer.write(fileInfo.getFileName().getBytes()); + } + writer.writeInt(logInfo.getBloomFilters().size()); + for(FileInfo fileInfo: logInfo.getBloomFilters()){ + writer.writeLong(fileInfo.getFileSize()); + writer.writeLong(fileInfo.getFileName().getBytes().length); + writer.write(fileInfo.getFileName().getBytes()); + } + } + + long crcValue = crcOutputStream.getValue(); + writer.writeLong(crcValue); + } catch (IOException e) { + logger.error("IO error while serializing remote peer tokens", e); + throw e; + } finally { + if (outputStream instanceof FileOutputStream) { + // flush and overwrite file + ((FileOutputStream) outputStream).getChannel().force(true); + } + writer.close(); + } + } + + /** + * Deserialize the remote tokens + * @param inputStream the input stream from the persistent file + * @return the mapping from replicas to remote tokens + */ + public List retrieve(InputStream inputStream) throws IOException { + List logInfoList = new ArrayList<>(); + CrcInputStream crcStream = new CrcInputStream(inputStream); + DataInputStream stream = new DataInputStream(crcStream); + ConcurrentMap> peerTokens = new ConcurrentHashMap<>(); + try { + while (stream.available() > Crc_Size) { + int logInfoListSize = stream.readInt(); + for(int i = 0; i < logInfoListSize; i++){ + // read log segment name + Long logSegmentSize = stream.readLong(); + byte[] logSegmentNameBytes = new byte[(int) stream.readLong()]; + stream.readFully(logSegmentNameBytes); + String logSegmentName = new String(logSegmentNameBytes); + FileInfo logSegment = new FileInfo(logSegmentName, logSegmentSize); + // read index segments + int indexSegmentsSize = stream.readInt(); + List indexSegments = new ArrayList<>(); + for(int j = 0; j < indexSegmentsSize; j++){ + Long fileSize = stream.readLong(); + byte[] indexSegmentNameBytes = new byte[(int) stream.readLong()]; + stream.readFully(indexSegmentNameBytes); + String indexSegmentName = new String(indexSegmentNameBytes); + indexSegments.add(new FileInfo(indexSegmentName, fileSize)); + } + // read bloom filters + int bloomFiltersSize = stream.readInt(); + List bloomFilters = new ArrayList<>(); + for(int j = 0; j < bloomFiltersSize; j++){ + Long fileSize = stream.readLong(); + byte[] bloomFilterNameBytes = new byte[(int) stream.readLong()]; + stream.readFully(bloomFilterNameBytes); + String bloomFilterName = new String(bloomFilterNameBytes); + bloomFilters.add(new FileInfo(bloomFilterName, fileSize)); + } + logInfoList.add(new LogInfo(logSegment, indexSegments, bloomFilters)); + } + } + + long computedCrc = crcStream.getValue(); + long readCrc = stream.readLong(); + if (computedCrc != readCrc) { + logger.error("Crc mismatch during peer token deserialization, computed " + computedCrc + ", read " + readCrc); + return new ArrayList<>(); + } + return logInfoList; + } catch (IOException e) { + logger.error("IO error deserializing remote peer tokens", e); + return new ArrayList<>(); + } finally { + stream.close(); + } + } } } diff --git a/ambry-store/src/main/java/com/github/ambry/store/Log.java b/ambry-store/src/main/java/com/github/ambry/store/Log.java index 4d76814d1d..91b1e33d12 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/Log.java +++ b/ambry-store/src/main/java/com/github/ambry/store/Log.java @@ -142,6 +142,10 @@ public int appendFrom(ByteBuffer buffer) throws StoreException { return activeSegment.appendFrom(buffer); } + LogSegment getActiveSegment() { + return activeSegment; + } + /** * Appends the given {@code byteArray} to the active log segment in direct IO manner. * The {@code byteArray} will be written to a single log segment i.e. its data will not exist across segments. diff --git a/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java b/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java index 0290013cf1..5d838cb3bd 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java +++ b/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java @@ -2669,6 +2669,26 @@ public boolean accept(File dir, String name) { }); } + /** + * Gets the list of bloom filter files that refer to the log segment with name {@code logSegmentName}. + * @param dataDir the directory where the index files are. + * @param logSegmentName the name of the log segment whose bloom filter files are required. + * @return the list of bloom filter files that refer to the log segment with name {@code logSegmentName}. + */ + static File[] getBloomFilterFiles(String dataDir, LogSegmentName logSegmentName) { + return new File(dataDir).listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + if (logSegmentName.toString().isEmpty()) { + return name.endsWith(IndexSegment.BLOOM_FILE_NAME_SUFFIX); + } else { + return name.startsWith(logSegmentName + BlobStore.SEPARATOR) && + name.endsWith(IndexSegment.BLOOM_FILE_NAME_SUFFIX); + } + } + }); + } + /** * Cleans up all files related to index segments that refer to the log segment with name {@code logSegmentName}. * @param dataDir the directory where the index files are. diff --git a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java index 1f1956082e..ef469fa266 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java @@ -36,9 +36,10 @@ import com.github.ambry.server.StoreManager; import com.github.ambry.utils.Time; import com.github.ambry.utils.Utils; +import java.io.DataInputStream; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; -import java.nio.file.FileStore; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -173,6 +174,31 @@ public StorageManager(StoreConfig storeConfig, DiskManagerConfig diskManagerConf } } + /** + * Gets the log segment metadata files from in-memory data structures + * This method returns List of LogSegmentFiles along with its IndexFiles, BloomFilterFiles + */ + public List getLogSegmentMetadataFiles(PartitionId partitionId, boolean includeActiveLogSegment) { + if (!partitionToDiskManager.containsKey(partitionId)) { + throw new IllegalArgumentException("DiskManager not found for partition " + partitionId); + } + return partitionToDiskManager.get(partitionId).getLogSegmentMetadataFiles(partitionId, includeActiveLogSegment); + } + + /** + * Get the chunk for the given {@link PartitionId} + * This method returns FileInputStream containing the + * chunk of size {@code sizeInBytes} starting from {@code startOffset}. + */ + @Override + public ChunkResponse getChunk(PartitionId partitionId, String fileName, long sizeInBytes, long startOffset) + throws IOException { + if (!partitionToDiskManager.containsKey(partitionId)) { + throw new IllegalArgumentException("DiskManager not found for partition " + partitionId); + } + return partitionToDiskManager.get(partitionId).getStreamForFile(partitionId, fileName, sizeInBytes, startOffset); + } + /** * Checks whether the replicas are placed on the correct disks. If not, reshuffle the disks, write * the new state to Helix and exit. We assume that this ambry-server instance will then be restarted with the @@ -459,7 +485,7 @@ public boolean controlCompactionForBlobStore(PartitionId id, boolean enabled) { @Override public boolean isFileExists(PartitionId partitionId, String fileName) { - return this.getDiskManager(partitionId).isFileExists(fileName); + return this.getDiskManager(partitionId).isFileExists(partitionId, fileName); } @Override @@ -580,10 +606,56 @@ public boolean addBlobStore(ReplicaId replica) { * @param replica the {@link ReplicaId} of the {@link Store} for which store needs to be built */ @Override - public boolean addFileStore(ReplicaId replicaId) { - //TODO: Implementation To Be added. - return false; + public boolean addFileStore(ReplicaId replica) { + if (partitionToDiskManager.containsKey(replica.getPartitionId())) { + logger.info("{} already exists in storage manager, rejecting adding store request", replica.getPartitionId()); + return false; + } + DiskManager diskManager = addDisk(replica.getDiskId()); + if (diskManager == null || !diskManager.addBlobStore(replica)) { + logger.error("Failed to add new store into DiskManager"); + return false; + } + partitionToDiskManager.put(replica.getPartitionId(), diskManager); + partitionNameToReplicaId.put(replica.getPartitionId().toPathString(), replica); + logger.info("New store is successfully added into StorageManager"); + return true; } + + @Override + public void setUpReplica(String partitionName) { + ReplicaId replica = partitionNameToReplicaId.get(partitionName); + if (replica == null) { + ReplicaId replicaToAdd; + boolean replicaAdded = false; + do { + // there can be two scenarios: + // 1. this is the first time to add new replica onto current node; + // 2. last replica addition failed at some point before updating InstanceConfig in Helix + // In either case, we should add replica to current node by calling "addBlobStore(ReplicaId replica)" + replicaToAdd = clusterMap.getBootstrapReplica(partitionName, currentNode); + if (replicaToAdd == null) { + logger.error("No new replica found for partition {} in cluster map", partitionName); + throw new StateTransitionException( + "New replica " + partitionName + " is not found in clustermap for " + currentNode, ReplicaNotFound); + } + // Attempt to add store into storage manager. If store already exists on disk (but not in clustermap), make + // sure old store of this replica is deleted (this store may be created in previous replica addition but failed + // at some point). Then a brand new store associated with this replica should be created and started. + if (!addFileStore(replicaToAdd)) { + // We have decreased the available disk space in HelixClusterManager#getDiskForBootstrapReplica. Increase it + // back since addition of store failed. + replicaToAdd.getDiskId().increaseAvailableSpaceInBytes(replicaToAdd.getCapacityInBytes()); + + logger.info("Failed to add store {} at location {}. Retrying bootstrapping replica at different location", + partitionName, replicaToAdd.getReplicaPath()); + }else{ + replicaAdded = true; + } + } while (!replicaAdded); + } + } + public void buildStateForFileCopy(ReplicaId replica){ if (replica == null) { logger.error("ReplicaId is null"); diff --git a/build.gradle b/build.gradle index ea929a848c..06edbc7f7a 100644 --- a/build.gradle +++ b/build.gradle @@ -334,6 +334,7 @@ project (':ambry-prioritization') { project (':ambry-file-transfer') { dependencies{ compile project(':ambry-api') + compile project(':ambry-protocol') compile project(':ambry-commons') compile project(':ambry-store') compile project(':ambry-prioritization')