From 3c5701ba8ab535e18433f8c27ac6cb899bd9de18 Mon Sep 17 00:00:00 2001 From: DevenAhluwalia Date: Thu, 9 Jan 2025 19:44:16 +0530 Subject: [PATCH 1/6] WIP --- .../github/ambry/store/SealedFileInfo.java | 32 +++++++++++++ .../ambry/protocol/RequestOrResponseType.java | 4 +- .../github/ambry/server/AmbryRequests.java | 21 +++++++++ .../com/github/ambry/store/BlobStore.java | 47 +++++++++++++++++++ .../main/java/com/github/ambry/store/Log.java | 4 ++ .../github/ambry/store/PersistentIndex.java | 20 ++++++++ 6 files changed, 127 insertions(+), 1 deletion(-) create mode 100644 ambry-api/src/main/java/com/github/ambry/store/SealedFileInfo.java diff --git a/ambry-api/src/main/java/com/github/ambry/store/SealedFileInfo.java b/ambry-api/src/main/java/com/github/ambry/store/SealedFileInfo.java new file mode 100644 index 0000000000..e16bd790a2 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/store/SealedFileInfo.java @@ -0,0 +1,32 @@ +/** + * Copyright 2024 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.store; + + +class SealedFileInfo { + private final String fileName; + private final long fileSize; + + public SealedFileInfo(String fileName, Long fileSize) { + this.fileName = fileName; + this.fileSize = fileSize; + } + public String getFileName() { + return fileName; + } + + public Long getFileSize() { + return fileSize; + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java index 5d803a45eb..801fa27168 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java @@ -41,5 +41,7 @@ public enum RequestOrResponseType { FileCopyGetChunkRequest, FileCopyGetChunkResponse, FileCopyGetMetaDataRequest, - FileCopyGetMetaDataResponse + FileCopyGetMetaDataResponse, + FileMetaDataRequest, + FileChunkRequest } diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java index a3d5553abc..a23dba79f7 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java @@ -234,6 +234,12 @@ public void handleRequests(NetworkRequest networkRequest) throws InterruptedExce case ReplicateBlobRequest: handleReplicateBlobRequest(networkRequest); break; + case FileMetaDataRequest: + handleFileMetaDataRequest(networkRequest); + break; + case FileChunkRequest: + handleFileChunkRequest(networkRequest); + break; default: throw new UnsupportedOperationException("Request type not supported"); } @@ -1672,6 +1678,21 @@ public void handleUndeleteRequest(NetworkRequest request) throws IOException, In metrics.undeleteBlobTotalTimeInMs, null, null, totalTimeSpent)); } + /** + * + * @param networkRequest + */ + void handleFileChunkRequest(NetworkRequest networkRequest) { + } + + /** + * + * @param networkRequest + */ + void handleFileMetaDataRequest(NetworkRequest networkRequest) { + throw new UnsupportedOperationException("Not implemented yet"); + } + /** * Get the formatted messages which needs to be written to Store. * @param receivedRequest received Put Request diff --git a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java index 3ef045aa82..5d7c0a726b 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java +++ b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java @@ -40,6 +40,7 @@ import java.text.SimpleDateFormat; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -56,6 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -252,6 +254,31 @@ public BlobStore(ReplicaId replicaId, StoreConfig config, ScheduledExecutorServi @Override public void start() throws StoreException { + List sealedLogsAndMetaDataFiles = getSealedLogSegmentFiles(); + if (null != sealedLogsAndMetaDataFiles) { + for (SealedFileInfo E : sealedLogsAndMetaDataFiles) { + System.out.println(E.getFileName() + ", size: " + E.getFileSize()); + logger.info("LS file: {} size: {}", E.getFileName(), E.getFileSize()); + + System.out.print(""); + List allIndexSegmentsForLogSegment = getAllIndexSegmentsForLogSegment(dataDir, LogSegmentName.fromFilename(E.getFileName())); + if (null != allIndexSegmentsForLogSegment) { + for (SealedFileInfo is : allIndexSegmentsForLogSegment) { + System.out.println(is.getFileName() + ", size: " + is.getFileSize()); + logger.info("IS file: {} size: {}", is.getFileName(), is.getFileSize()); + } + } + System.out.print(""); + List bloomFiltersForLogSegment = getAllBloomFiltersForLogSegment(dataDir, LogSegmentName.fromFilename(E.getFileName())); + if (null != bloomFiltersForLogSegment) { + for (SealedFileInfo bf : bloomFiltersForLogSegment) { + System.out.println(bf.getFileName() + ", size: " + bf.getFileSize()); + logger.info("BF file: {} size: {}", bf.getFileName(), bf.getFileSize()); + } + } + } + } + synchronized (storeWriteLock) { if (started) { throw new StoreException("Store already started", StoreErrorCodes.Store_Already_Started); @@ -1317,6 +1344,26 @@ public void shutdown() throws StoreException { shutdown(false); } + List getSealedLogSegmentFiles(){ + return log.getAllLogSegmentNames().stream() + .filter(segment -> log.getActiveSegment().getName() != segment) + .map(segment -> log.getSegment(segment)) + .map(segment -> new SealedFileInfo(segment.getName().toString(), segment.getView().getFirst().length())) + .collect(Collectors.toList()); + } + + List getAllIndexSegmentsForLogSegment(String dataDir, LogSegmentName logSegmentName){ + return Arrays.stream(PersistentIndex.getIndexSegmentFilesForLogSegment(dataDir, logSegmentName)) + .map(file -> new SealedFileInfo(file.getName(), file.length())) + .collect(Collectors.toList()); + } + + List getAllBloomFiltersForLogSegment(String dataDir, LogSegmentName logSegmentName){ + return Arrays.stream(PersistentIndex.getBloomFilterFiles(dataDir, logSegmentName)) + .map(file -> new SealedFileInfo(file.getName(), file.length())) + .collect(Collectors.toList()); + } + /** * Update the sealed status of the replica. */ diff --git a/ambry-store/src/main/java/com/github/ambry/store/Log.java b/ambry-store/src/main/java/com/github/ambry/store/Log.java index 4d76814d1d..91b1e33d12 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/Log.java +++ b/ambry-store/src/main/java/com/github/ambry/store/Log.java @@ -142,6 +142,10 @@ public int appendFrom(ByteBuffer buffer) throws StoreException { return activeSegment.appendFrom(buffer); } + LogSegment getActiveSegment() { + return activeSegment; + } + /** * Appends the given {@code byteArray} to the active log segment in direct IO manner. * The {@code byteArray} will be written to a single log segment i.e. its data will not exist across segments. diff --git a/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java b/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java index 0290013cf1..5d838cb3bd 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java +++ b/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java @@ -2669,6 +2669,26 @@ public boolean accept(File dir, String name) { }); } + /** + * Gets the list of bloom filter files that refer to the log segment with name {@code logSegmentName}. + * @param dataDir the directory where the index files are. + * @param logSegmentName the name of the log segment whose bloom filter files are required. + * @return the list of bloom filter files that refer to the log segment with name {@code logSegmentName}. + */ + static File[] getBloomFilterFiles(String dataDir, LogSegmentName logSegmentName) { + return new File(dataDir).listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + if (logSegmentName.toString().isEmpty()) { + return name.endsWith(IndexSegment.BLOOM_FILE_NAME_SUFFIX); + } else { + return name.startsWith(logSegmentName + BlobStore.SEPARATOR) && + name.endsWith(IndexSegment.BLOOM_FILE_NAME_SUFFIX); + } + } + }); + } + /** * Cleans up all files related to index segments that refer to the log segment with name {@code logSegmentName}. * @param dataDir the directory where the index files are. From 2ad86923ed26ef86d805fc7d361b6c4c145a61a4 Mon Sep 17 00:00:00 2001 From: DevenAhluwalia Date: Tue, 14 Jan 2025 15:53:32 +0530 Subject: [PATCH 2/6] WIP --- .../{SealedFileInfo.java => FileInfo.java} | 5 +- .../java/com/github/ambry/store/LogInfo.java | 45 ++++++++++++ .../protocol/FileCopyGetMetaDataRequest.java | 22 +++--- .../protocol/FileCopyGetMetaDataResponse.java | 7 +- .../com/github/ambry/protocol/FileInfo.java | 17 +++-- .../com/github/ambry/protocol/LogInfo.java | 34 +++++---- .../ambry/protocol/RequestOrResponseType.java | 3 +- .../github/ambry/server/AmbryRequests.java | 65 ++++++++++++++--- .../com/github/ambry/server/AmbryServer.java | 10 ++- .../ambry/server/AmbryServerRequestsTest.java | 18 +++-- .../com/github/ambry/store/BlobStore.java | 70 ++++++++++--------- .../com/github/ambry/store/DiskManager.java | 23 ++++++ .../github/ambry/store/StorageManager.java | 11 +++ 13 files changed, 246 insertions(+), 84 deletions(-) rename ambry-api/src/main/java/com/github/ambry/store/{SealedFileInfo.java => FileInfo.java} (90%) create mode 100644 ambry-api/src/main/java/com/github/ambry/store/LogInfo.java diff --git a/ambry-api/src/main/java/com/github/ambry/store/SealedFileInfo.java b/ambry-api/src/main/java/com/github/ambry/store/FileInfo.java similarity index 90% rename from ambry-api/src/main/java/com/github/ambry/store/SealedFileInfo.java rename to ambry-api/src/main/java/com/github/ambry/store/FileInfo.java index e16bd790a2..700c99faff 100644 --- a/ambry-api/src/main/java/com/github/ambry/store/SealedFileInfo.java +++ b/ambry-api/src/main/java/com/github/ambry/store/FileInfo.java @@ -14,14 +14,15 @@ package com.github.ambry.store; -class SealedFileInfo { +public class FileInfo { private final String fileName; private final long fileSize; - public SealedFileInfo(String fileName, Long fileSize) { + public FileInfo(String fileName, Long fileSize) { this.fileName = fileName; this.fileSize = fileSize; } + public String getFileName() { return fileName; } diff --git a/ambry-api/src/main/java/com/github/ambry/store/LogInfo.java b/ambry-api/src/main/java/com/github/ambry/store/LogInfo.java new file mode 100644 index 0000000000..6a2da09e30 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/store/LogInfo.java @@ -0,0 +1,45 @@ +/** + * Copyright 2024 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.store; + +import java.util.Collections; +import java.util.List; + + +public class LogInfo { + private final FileInfo logSegment; + private final List indexSegments; + private final List bloomFilters; + + public LogInfo(FileInfo logSegment, List indexSegments, List bloomFilters) { + this.logSegment = logSegment; + this.indexSegments = indexSegments; + this.bloomFilters = bloomFilters; + } + + // TODO: Add isSealed prop + // private final boolean isSealed; + + public FileInfo getLogSegment() { + return logSegment; + } + + public List getIndexSegments() { + return Collections.unmodifiableList(indexSegments); + } + + public List getBloomFilters() { + return Collections.unmodifiableList(bloomFilters); + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java index ee87c3b236..f851de6af8 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java @@ -19,19 +19,23 @@ import java.io.DataInputStream; import java.io.IOException; import java.nio.charset.Charset; +import javax.annotation.Nonnull; + public class FileCopyGetMetaDataRequest extends RequestOrResponse{ - private PartitionId partitionId; - private String hostName; + private final PartitionId partitionId; + private final String hostName; private static final short File_Metadata_Request_Version_V1 = 1; private static final int HostName_Field_Size_In_Bytes = 4; - public FileCopyGetMetaDataRequest(short versionId, int correlationId, String clientId, - PartitionId partitionId, String hostName) { + public FileCopyGetMetaDataRequest( + short versionId, + int correlationId, + String clientId, + @Nonnull PartitionId partitionId, + @Nonnull String hostName) { super(RequestOrResponseType.FileCopyGetMetaDataRequest, versionId, correlationId, clientId); - if (partitionId == null) { - throw new IllegalArgumentException("Partition cannot be null"); - } + if (hostName.isEmpty()){ throw new IllegalArgumentException("Host Name cannot be null"); } @@ -47,7 +51,9 @@ public PartitionId getPartitionId() { return partitionId; } - protected static FileCopyGetMetaDataRequest readFrom(DataInputStream stream, ClusterMap clusterMap) throws IOException { + public static FileCopyGetMetaDataRequest readFrom( + @Nonnull DataInputStream stream, + @Nonnull ClusterMap clusterMap) throws IOException { Short versionId = stream.readShort(); validateVersion(versionId); int correlationId = stream.readInt(); diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java index d8ec80fdeb..3b0cd3311d 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java @@ -19,7 +19,9 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import javax.annotation.Nonnull; public class FileCopyGetMetaDataResponse extends Response { @@ -35,7 +37,8 @@ public FileCopyGetMetaDataResponse(short versionId, int correlationId, String cl this.logInfos = logInfos; } - public static FileCopyGetMetaDataResponse readFrom(DataInputStream stream) throws IOException { + public static FileCopyGetMetaDataResponse readFrom( + @Nonnull DataInputStream stream) throws IOException { RequestOrResponseType type = RequestOrResponseType.values()[stream.readShort()]; if (type != RequestOrResponseType.FileCopyGetMetaDataResponse) { throw new IllegalArgumentException("The type of request response is not compatible. Expected : {}, Actual : {}" + @@ -82,7 +85,7 @@ public int getNumberOfLogfiles() { } public List getLogInfos() { - return logInfos; + return Collections.unmodifiableList(logInfos); } static void validateVersion(short version) { diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java index 020a8348f4..e6e1991591 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java @@ -18,6 +18,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.nio.charset.Charset; +import javax.annotation.Nonnull; /** @@ -25,15 +26,15 @@ * by LogInfo as part of filecopy metadata request. */ public class FileInfo { - private String fileName; - private long fileSizeInBytes; + private final String fileName; + private final long fileSizeInBytes; private static final int FileName_Field_Size_In_Bytes = 4; - private static final int FileSize_Field_Size_In_Bytes = 8; - - public FileInfo(String fileName, long fileSize) { + public FileInfo( + @Nonnull String fileName, + @Nonnull long fileSize) { this.fileName = fileName; this.fileSizeInBytes = fileSize; } @@ -41,12 +42,14 @@ public FileInfo(String fileName, long fileSize) { public long sizeInBytes() { return FileName_Field_Size_In_Bytes + fileName.length() + FileSize_Field_Size_In_Bytes; } - public static FileInfo readFrom(DataInputStream stream) throws IOException { + + public static FileInfo readFrom(@Nonnull DataInputStream stream) throws IOException { String fileName = Utils.readIntString(stream); long fileSize = stream.readLong(); return new FileInfo(fileName, fileSize); } - public void writeTo(ByteBuf buf) { + + public void writeTo(@Nonnull ByteBuf buf) { Utils.serializeString(buf, fileName, Charset.defaultCharset()); buf.writeLong(fileSizeInBytes); } diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java index fb8d131a9c..c8e337bdb3 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java @@ -19,7 +19,9 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import javax.annotation.Nonnull; /** @@ -27,16 +29,25 @@ * by filecopy metadata request. */ public class LogInfo { - private String fileName; - private long fileSizeInBytes; - List indexFiles; - List bloomFilters; + // TODO: Replace these fields with FileInfo + // private FileInfo fileInfo; + private final String fileName; + private final long fileSizeInBytes; + private final List indexFiles; + private final List bloomFilters; + + // TODO: Add isSealed prop + // private final boolean isSealed; private static final int FileName_Field_Size_In_Bytes = 4; private static final int FileSize_Field_Size_In_Bytes = 8; - private static final int ListSize_In_Bytes = 4; - public LogInfo(String fileName, long fileSizeInBytes, List indexFiles, List bloomFilters) { + + public LogInfo( + @Nonnull String fileName, + long fileSizeInBytes, + @Nonnull List indexFiles, + @Nonnull List bloomFilters) { this.fileName = fileName; this.fileSizeInBytes = fileSizeInBytes; this.indexFiles = indexFiles; @@ -52,11 +63,11 @@ public long getFileSizeInBytes() { } public List getBloomFilters() { - return bloomFilters; + return Collections.unmodifiableList(bloomFilters); } public List getIndexFiles() { - return indexFiles; + return Collections.unmodifiableList(indexFiles); } public long sizeInBytes() { @@ -71,7 +82,7 @@ public long sizeInBytes() { return size; } - public static LogInfo readFrom(DataInputStream stream) throws IOException { + public static LogInfo readFrom(@Nonnull DataInputStream stream) throws IOException { String fileName = Utils.readIntString(stream ); long fileSize = stream.readLong(); List listOfIndexFiles = new ArrayList<>(); @@ -89,7 +100,7 @@ public static LogInfo readFrom(DataInputStream stream) throws IOException { return new LogInfo(fileName, fileSize, listOfIndexFiles, listOfBloomFilters); } - public void writeTo(ByteBuf buf){ + public void writeTo(@Nonnull ByteBuf buf){ Utils.serializeString(buf, fileName, Charset.defaultCharset()); buf.writeLong(fileSizeInBytes); buf.writeInt(indexFiles.size()); @@ -115,5 +126,4 @@ public String toString(){ sb.append("]"); return sb.toString(); } - -} +} \ No newline at end of file diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java index 801fa27168..f58154b554 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java @@ -42,6 +42,5 @@ public enum RequestOrResponseType { FileCopyGetChunkResponse, FileCopyGetMetaDataRequest, FileCopyGetMetaDataResponse, - FileMetaDataRequest, - FileChunkRequest + FileMetaDataRequest } diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java index a23dba79f7..073fbb5ddc 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java @@ -60,9 +60,11 @@ import com.github.ambry.protocol.CompositeSend; import com.github.ambry.protocol.DeleteRequest; import com.github.ambry.protocol.DeleteResponse; +import com.github.ambry.protocol.FileCopyGetMetaDataResponse; import com.github.ambry.protocol.GetOption; import com.github.ambry.protocol.GetRequest; import com.github.ambry.protocol.GetResponse; +import com.github.ambry.protocol.LogInfo; import com.github.ambry.protocol.PartitionRequestInfo; import com.github.ambry.protocol.PartitionResponseInfo; import com.github.ambry.protocol.PurgeRequest; @@ -92,6 +94,7 @@ import com.github.ambry.store.Message; import com.github.ambry.store.MessageErrorInfo; import com.github.ambry.store.MessageInfo; +import com.github.ambry.store.StorageManager; import com.github.ambry.store.Store; import com.github.ambry.store.StoreBatchDeleteInfo; import com.github.ambry.store.StoreErrorCodes; @@ -237,9 +240,6 @@ public void handleRequests(NetworkRequest networkRequest) throws InterruptedExce case FileMetaDataRequest: handleFileMetaDataRequest(networkRequest); break; - case FileChunkRequest: - handleFileChunkRequest(networkRequest); - break; default: throw new UnsupportedOperationException("Request type not supported"); } @@ -1680,17 +1680,60 @@ public void handleUndeleteRequest(NetworkRequest request) throws IOException, In /** * - * @param networkRequest + * @param request */ - void handleFileChunkRequest(NetworkRequest networkRequest) { + void handleFileMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException { +// FileCopyGetMetaDataRequest fileCopyGetMetaDataRequest = +// FileCopyGetMetaDataRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); + + List logSegments = ((StorageManager)storeManager).getLogSegmentMetadataFiles(); + List logInfos = convertStoreToProtocolLogInfo(logSegments); + + FileCopyGetMetaDataResponse response = new FileCopyGetMetaDataResponse((short)0, 0, "", + logSegments.size(), logInfos, ServerErrorCode.No_Error); +// Histogram dummyHistogram = new Histogram(new Reservoir() { +// @Override +// public int size() { +// return 0; +// } +// +// @Override +// public void update(long value) { +// } +// +// @Override +// public Snapshot getSnapshot() { +// return null; +// } +// }); + + logger.info("[Dw] Api response - " + response); + System.out.println("[Dw] Api response - " + response); + +// requestResponseChannel.sendResponse( +// response, request, +// new ServerNetworkResponseMetrics(dummyHistogram, dummyHistogram, dummyHistogram, +// null, null, 0)); } - /** - * - * @param networkRequest - */ - void handleFileMetaDataRequest(NetworkRequest networkRequest) { - throw new UnsupportedOperationException("Not implemented yet"); + private List convertStoreToProtocolLogInfo(List logSegments) { + List logInfos = new ArrayList<>(); + for (com.github.ambry.store.LogInfo logSegment : logSegments) { + List indexSegments = new ArrayList<>(); + logSegment.getIndexSegments().forEach(indexSegment -> { + indexSegments.add(new com.github.ambry.protocol.FileInfo(indexSegment.getFileName(), indexSegment.getFileSize())); + }); + + List bloomFilters = new ArrayList<>(); + logSegment.getBloomFilters().forEach(bloomFilter -> { + bloomFilters.add(new com.github.ambry.protocol.FileInfo(bloomFilter.getFileName(), bloomFilter.getFileSize())); + }); + + logInfos.add(new LogInfo( + logSegment.getLogSegment().getFileName(), logSegment.getLogSegment().getFileSize(), + indexSegments, bloomFilters)); + } + return logInfos; } /** diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java index 428bb55af2..8f73f5f13b 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java @@ -23,7 +23,6 @@ import com.github.ambry.cloud.BackupIntegrityMonitor; import com.github.ambry.cloud.RecoveryManager; import com.github.ambry.cloud.RecoveryNetworkClientFactory; -import com.github.ambry.clustermap.AmbryDataNode; import com.github.ambry.clustermap.AmbryServerDataNode; import com.github.ambry.clustermap.ClusterAgentsFactory; import com.github.ambry.clustermap.ClusterMap; @@ -39,7 +38,6 @@ import com.github.ambry.commons.NettySslHttp2Factory; import com.github.ambry.commons.SSLFactory; import com.github.ambry.commons.ServerMetrics; -import com.github.ambry.config.CloudConfig; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.ConnectionPoolConfig; import com.github.ambry.config.DiskManagerConfig; @@ -104,7 +102,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Predicate; -import org.apache.logging.log4j.core.util.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -466,6 +463,13 @@ public void startup() throws InstantiationException { for (ClusterParticipant participant : clusterParticipants) { participant.participate(ambryStatsReports, accountStatsMySqlStore, accountServiceCallback); } + +// RequestOrResponse request = new com.github.ambry.protocol.FileCopyGetMetaDataRequest( +// (short) 0, 0, "", +// new com.github.ambry.clustermap.Partition((long)0, "", PartitionState.READ_WRITE, 1073741824), +// "hostName"); + requests.handleFileMetaDataRequest(EmptyRequest.getInstance()); + } else { throw new IllegalArgumentException("Unknown server execution mode"); } diff --git a/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java b/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java index db08334f9a..d7683a1f09 100644 --- a/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java +++ b/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java @@ -227,10 +227,10 @@ public AmbryServerRequestsTest(boolean validateRequestOnStoreState, boolean enab Mockito.when(mockDelegate.unmarkStopped(anyList())).thenReturn(true); } - @Parameterized.Parameters - public static List data() { - return Arrays.asList(new Object[][]{{false, false}, {true, false}, {false, true}, {true, true}}); - } +// @Parameterized.Parameters +// public static List data() { +// return Arrays.asList(new Object[][]{{false, false}, {true, false}, {false, true}, {true, true}}); +// } private static Properties createProperties(boolean validateRequestOnStoreState, boolean handleUndeleteRequestEnabled) { @@ -1532,6 +1532,16 @@ null, new DiskIOScheduler(null), StoreTestUtils.DEFAULT_DISK_SPACE_ALLOCATOR, datanode, null); } + @Test + public void foo() throws IOException, InterruptedException { + RequestOrResponse get_metadata_request = new com.github.ambry.protocol.FileCopyGetMetaDataRequest( + (short) 0, 0, "", new MockPartitionId(), "hostName"); +// Response response = sendRequestGetResponse(get_metadata_request, ServerErrorCode.No_Error); + + NetworkRequest mockRequest = MockRequest.fromRequest(get_metadata_request); + ambryRequests.handleFileMetaDataRequest(mockRequest); + } + /** * Performs the AdminRequest and Response as well as checking if the content is a json and expected * @param partitionId necessary to fulfill the {@link AdminRequest} diff --git a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java index 5d7c0a726b..494952f7d9 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java +++ b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java @@ -254,31 +254,6 @@ public BlobStore(ReplicaId replicaId, StoreConfig config, ScheduledExecutorServi @Override public void start() throws StoreException { - List sealedLogsAndMetaDataFiles = getSealedLogSegmentFiles(); - if (null != sealedLogsAndMetaDataFiles) { - for (SealedFileInfo E : sealedLogsAndMetaDataFiles) { - System.out.println(E.getFileName() + ", size: " + E.getFileSize()); - logger.info("LS file: {} size: {}", E.getFileName(), E.getFileSize()); - - System.out.print(""); - List allIndexSegmentsForLogSegment = getAllIndexSegmentsForLogSegment(dataDir, LogSegmentName.fromFilename(E.getFileName())); - if (null != allIndexSegmentsForLogSegment) { - for (SealedFileInfo is : allIndexSegmentsForLogSegment) { - System.out.println(is.getFileName() + ", size: " + is.getFileSize()); - logger.info("IS file: {} size: {}", is.getFileName(), is.getFileSize()); - } - } - System.out.print(""); - List bloomFiltersForLogSegment = getAllBloomFiltersForLogSegment(dataDir, LogSegmentName.fromFilename(E.getFileName())); - if (null != bloomFiltersForLogSegment) { - for (SealedFileInfo bf : bloomFiltersForLogSegment) { - System.out.println(bf.getFileName() + ", size: " + bf.getFileSize()); - logger.info("BF file: {} size: {}", bf.getFileName(), bf.getFileSize()); - } - } - } - } - synchronized (storeWriteLock) { if (started) { throw new StoreException("Store already started", StoreErrorCodes.Store_Already_Started); @@ -353,7 +328,8 @@ public void start() throws StoreException { } metrics.storeStartFailure.inc(); String err = String.format("Error while starting store for dir %s due to %s", dataDir, e.getMessage()); - throw new StoreException(err, e, StoreErrorCodes.Initialization_Error); + // [Dw-remove] + // throw new StoreException(err, e, StoreErrorCodes.Initialization_Error); } finally { context.stop(); } @@ -1344,23 +1320,51 @@ public void shutdown() throws StoreException { shutdown(false); } - List getSealedLogSegmentFiles(){ + List printAndReturnFiles() { + List result = new ArrayList<>(); + + List sealedLogsAndMetaDataFiles = getLogSegments(true); + if (null != sealedLogsAndMetaDataFiles) { + for (FileInfo E : sealedLogsAndMetaDataFiles) { + logger.info("[Dw] LS file: {} size: {}", E.getFileName(), E.getFileSize()); + + List allIndexSegmentsForLogSegment = getAllIndexSegmentsForLogSegment( + dataDir, LogSegmentName.fromFilename(E.getFileName() + LogSegmentName.SUFFIX)); + if (null != allIndexSegmentsForLogSegment) { + for (FileInfo is : allIndexSegmentsForLogSegment) { + logger.info("[Dw] IS file: {} size: {}", is.getFileName(), is.getFileSize()); + } + } + List bloomFiltersForLogSegment = getAllBloomFiltersForLogSegment( + dataDir, LogSegmentName.fromFilename(E.getFileName() + LogSegmentName.SUFFIX)); + if (null != bloomFiltersForLogSegment) { + for (FileInfo bf : bloomFiltersForLogSegment) { + logger.info("[Dw] BF file: {} size: {}", bf.getFileName(), bf.getFileSize()); + } + } + result.add(new LogInfo(E, allIndexSegmentsForLogSegment, bloomFiltersForLogSegment)); + } + } + return result; + } + + List getLogSegments(boolean includeActiveLogSegment){ return log.getAllLogSegmentNames().stream() - .filter(segment -> log.getActiveSegment().getName() != segment) + .filter(segment -> includeActiveLogSegment || !segment.equals(log.getActiveSegment().getName())) .map(segment -> log.getSegment(segment)) - .map(segment -> new SealedFileInfo(segment.getName().toString(), segment.getView().getFirst().length())) + .map(segment -> new FileInfo(segment.getName().toString(), segment.getView().getFirst().length())) .collect(Collectors.toList()); } - List getAllIndexSegmentsForLogSegment(String dataDir, LogSegmentName logSegmentName){ + List getAllIndexSegmentsForLogSegment(String dataDir, LogSegmentName logSegmentName){ return Arrays.stream(PersistentIndex.getIndexSegmentFilesForLogSegment(dataDir, logSegmentName)) - .map(file -> new SealedFileInfo(file.getName(), file.length())) + .map(file -> new FileInfo(file.getName(), file.length())) .collect(Collectors.toList()); } - List getAllBloomFiltersForLogSegment(String dataDir, LogSegmentName logSegmentName){ + List getAllBloomFiltersForLogSegment(String dataDir, LogSegmentName logSegmentName){ return Arrays.stream(PersistentIndex.getBloomFilterFiles(dataDir, logSegmentName)) - .map(file -> new SealedFileInfo(file.getName(), file.length())) + .map(file -> new FileInfo(file.getName(), file.length())) .collect(Collectors.toList()); } diff --git a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java index d6412b0b31..422dffa2ea 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java @@ -52,6 +52,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -195,6 +196,9 @@ void start(boolean shouldRemoveUnexpectedDirs) throws InterruptedException { Thread thread = Utils.newThread("store-startup-" + partitionAndStore.getKey(), () -> { try { partitionAndStore.getValue().start(); + // [Dw remove] +// logger.info("[Dw] PrintUtil for PartitionId: " + partitionAndStore.getKey().getId()); +// partitionAndStore.getValue().printAndReturnFiles(); } catch (Exception e) { numStoreFailures.incrementAndGet(); logger.error("Exception while starting store for the {}", partitionAndStore.getKey(), e); @@ -813,15 +817,34 @@ public DiskHealthStatus getDiskHealthStatus() { return diskHealthStatus; } + /** + * Checks if the file exists on the disk + * @param fileName + * @return + */ public boolean isFileExists(String fileName) { String filePath = this.disk.getMountPath() + File.separator + fileName; return new File(filePath).exists(); } + /** + * Gets the files for the given pattern from the disk + */ public List getFilesForPattern(Pattern pattern) throws IOException { return Utils.getFilesForPattern(this.disk.getMountPath(), pattern); } + /** + * Gets the log segment metadata files from in-memory data structures + * This method returns List of LogSegment File along with its IndexFiles, BloomFilterFiles + */ + public List getLogSegmentMetadataFiles() { + return stores.values().stream() + .map(BlobStore::printAndReturnFiles) + .flatMap(List::stream) + .collect(Collectors.toList()); + } + /** Manages the disk healthchecking tasks such as creating,writing, reading, and deleting a file */ diff --git a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java index 1f1956082e..5fd2e0bcee 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java @@ -173,6 +173,17 @@ public StorageManager(StoreConfig storeConfig, DiskManagerConfig diskManagerConf } } + /** + * Gets the log segment metadata files from in-memory data structures + * This method returns List of LogSegment File along with its IndexFiles, BloomFilterFiles + */ + public List getLogSegmentMetadataFiles() { + return partitionToDiskManager.values().stream() + .map(DiskManager::getLogSegmentMetadataFiles) + .flatMap(List::stream) + .collect(Collectors.toList()); + } + /** * Checks whether the replicas are placed on the correct disks. If not, reshuffle the disks, write * the new state to Helix and exit. We assume that this ambry-server instance will then be restarted with the From 0ce0d91947327b86f236200f58d4ad3982802751 Mon Sep 17 00:00:00 2001 From: DevenAhluwalia Date: Tue, 14 Jan 2025 23:36:09 +0530 Subject: [PATCH 3/6] WIP --- .../network/LocalRequestResponseChannel.java | 2 +- .../protocol/FileCopyGetMetaDataRequest.java | 2 +- .../com/github/ambry/protocol/LogInfo.java | 20 ++++- .../ambry/protocol/RequestOrResponseType.java | 3 +- .../github/ambry/server/AmbryRequests.java | 48 ++++++------ .../com/github/ambry/server/AmbryServer.java | 77 +++++++++++++++++-- .../ambry/server/AmbryServerRequestsTest.java | 27 +++---- .../com/github/ambry/store/BlobStore.java | 27 +++++-- .../com/github/ambry/store/DiskManager.java | 12 +-- .../github/ambry/store/StorageManager.java | 12 +-- 10 files changed, 162 insertions(+), 68 deletions(-) diff --git a/ambry-network/src/main/java/com/github/ambry/network/LocalRequestResponseChannel.java b/ambry-network/src/main/java/com/github/ambry/network/LocalRequestResponseChannel.java index 8a7b1c9ac5..a18561da7c 100644 --- a/ambry-network/src/main/java/com/github/ambry/network/LocalRequestResponseChannel.java +++ b/ambry-network/src/main/java/com/github/ambry/network/LocalRequestResponseChannel.java @@ -110,7 +110,7 @@ public static class LocalChannelRequest implements NetworkRequest { private long startTimeInMs; private int processorId; - LocalChannelRequest(RequestInfo requestInfo, int processorId) { + public LocalChannelRequest(RequestInfo requestInfo, int processorId) { this.requestInfo = requestInfo; this.processorId = processorId; startTimeInMs = System.currentTimeMillis(); diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java index f851de6af8..cb5fbbb0a3 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java @@ -54,7 +54,7 @@ public PartitionId getPartitionId() { public static FileCopyGetMetaDataRequest readFrom( @Nonnull DataInputStream stream, @Nonnull ClusterMap clusterMap) throws IOException { - Short versionId = stream.readShort(); + short versionId = stream.readShort(); validateVersion(versionId); int correlationId = stream.readInt(); String clientId = Utils.readIntString(stream); diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java index c8e337bdb3..2bb76c0e9c 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java @@ -117,11 +117,23 @@ public String toString(){ StringBuilder sb = new StringBuilder(); sb.append("LogInfo["); sb.append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes).append(","); - for(FileInfo fileInfo : indexFiles) { - sb.append(fileInfo.toString()); + + if(!indexFiles.isEmpty()) { + sb.append(" IndexFiles=["); + for (FileInfo fileInfo : indexFiles) { + sb.append(fileInfo.toString()); + } + sb.append("]"); + if(!bloomFilters.isEmpty()) { + sb.append(", "); + } } - for(FileInfo fileInfo: bloomFilters){ - sb.append(fileInfo.toString()); + if(!bloomFilters.isEmpty()){ + sb.append(" BloomFilters=["); + for(FileInfo fileInfo: bloomFilters){ + sb.append(fileInfo.toString()); + } + sb.append("]"); } sb.append("]"); return sb.toString(); diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java index f58154b554..5d803a45eb 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java @@ -41,6 +41,5 @@ public enum RequestOrResponseType { FileCopyGetChunkRequest, FileCopyGetChunkResponse, FileCopyGetMetaDataRequest, - FileCopyGetMetaDataResponse, - FileMetaDataRequest + FileCopyGetMetaDataResponse } diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java index 073fbb5ddc..6edd57f5f6 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java @@ -15,6 +15,8 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Reservoir; +import com.codahale.metrics.Snapshot; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.ambry.clustermap.ClusterMap; @@ -60,6 +62,7 @@ import com.github.ambry.protocol.CompositeSend; import com.github.ambry.protocol.DeleteRequest; import com.github.ambry.protocol.DeleteResponse; +import com.github.ambry.protocol.FileCopyGetMetaDataRequest; import com.github.ambry.protocol.FileCopyGetMetaDataResponse; import com.github.ambry.protocol.GetOption; import com.github.ambry.protocol.GetRequest; @@ -237,7 +240,7 @@ public void handleRequests(NetworkRequest networkRequest) throws InterruptedExce case ReplicateBlobRequest: handleReplicateBlobRequest(networkRequest); break; - case FileMetaDataRequest: + case FileCopyGetMetaDataRequest: handleFileMetaDataRequest(networkRequest); break; default: @@ -1679,33 +1682,34 @@ public void handleUndeleteRequest(NetworkRequest request) throws IOException, In } /** - * - * @param request + * Handler for FileMetadataRequest */ void handleFileMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException { -// FileCopyGetMetaDataRequest fileCopyGetMetaDataRequest = -// FileCopyGetMetaDataRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); + FileCopyGetMetaDataRequest fileCopyGetMetaDataRequest = + FileCopyGetMetaDataRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); + + List logSegments = ((StorageManager)storeManager) + .getLogSegmentMetadataFiles(fileCopyGetMetaDataRequest.getPartitionId(), true); - List logSegments = ((StorageManager)storeManager).getLogSegmentMetadataFiles(); List logInfos = convertStoreToProtocolLogInfo(logSegments); - FileCopyGetMetaDataResponse response = new FileCopyGetMetaDataResponse((short)0, 0, "", + FileCopyGetMetaDataResponse response = new FileCopyGetMetaDataResponse((short)1, 0, "", logSegments.size(), logInfos, ServerErrorCode.No_Error); -// Histogram dummyHistogram = new Histogram(new Reservoir() { -// @Override -// public int size() { -// return 0; -// } -// -// @Override -// public void update(long value) { -// } -// -// @Override -// public Snapshot getSnapshot() { -// return null; -// } -// }); + Histogram dummyHistogram = new Histogram(new Reservoir() { + @Override + public int size() { + return 0; + } + + @Override + public void update(long value) { + } + + @Override + public Snapshot getSnapshot() { + return null; + } + }); logger.info("[Dw] Api response - " + response); System.out.println("[Dw] Api response - " + response); diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java index 8f73f5f13b..d25f836121 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java @@ -30,6 +30,8 @@ import com.github.ambry.clustermap.CompositeClusterManager; import com.github.ambry.clustermap.DataNodeId; import com.github.ambry.clustermap.HelixClusterManager; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.clustermap.PartitionState; import com.github.ambry.clustermap.StaticClusterManager; import com.github.ambry.clustermap.VcrClusterAgentsFactory; import com.github.ambry.commons.Callback; @@ -60,9 +62,11 @@ import com.github.ambry.network.NettyServerRequestResponseChannel; import com.github.ambry.network.NetworkClientFactory; import com.github.ambry.network.NetworkMetrics; +import com.github.ambry.network.NetworkRequest; import com.github.ambry.network.NetworkServer; import com.github.ambry.network.Port; import com.github.ambry.network.PortType; +import com.github.ambry.network.RequestInfo; import com.github.ambry.network.ServerRequestResponseHelper; import com.github.ambry.network.SocketNetworkClientFactory; import com.github.ambry.network.SocketServer; @@ -72,6 +76,7 @@ import com.github.ambry.network.http2.Http2ServerMetrics; import com.github.ambry.notification.NotificationSystem; import com.github.ambry.protocol.RequestHandlerPool; +import com.github.ambry.protocol.RequestOrResponse; import com.github.ambry.repair.RepairRequestsDb; import com.github.ambry.repair.RepairRequestsDbFactory; import com.github.ambry.replication.FindTokenHelper; @@ -88,10 +93,14 @@ import com.github.ambry.store.StorageManager; import com.github.ambry.store.StoreKeyConverterFactory; import com.github.ambry.store.StoreKeyFactory; +import com.github.ambry.utils.ByteBufferChannel; +import com.github.ambry.utils.ByteBufferDataInputStream; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Time; import com.github.ambry.utils.Utils; import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -463,13 +472,7 @@ public void startup() throws InstantiationException { for (ClusterParticipant participant : clusterParticipants) { participant.participate(ambryStatsReports, accountStatsMySqlStore, accountServiceCallback); } - -// RequestOrResponse request = new com.github.ambry.protocol.FileCopyGetMetaDataRequest( -// (short) 0, 0, "", -// new com.github.ambry.clustermap.Partition((long)0, "", PartitionState.READ_WRITE, 1073741824), -// "hostName"); - requests.handleFileMetaDataRequest(EmptyRequest.getInstance()); - + invokeGetMetadataApi(); } else { throw new IllegalArgumentException("Unknown server execution mode"); } @@ -488,6 +491,66 @@ public void startup() throws InstantiationException { } } + private void invokeGetMetadataApi() { + List partitionIds = clusterMap.getAllPartitionIds(null); + + partitionIds.forEach(partitionId -> { + RequestOrResponse request = new com.github.ambry.protocol.FileCopyGetMetaDataRequest( + (short) 1, 0, "", partitionId, "hostName"); + + NetworkRequest mockRequest = null; + try { + mockRequest = MockRequest.fromRequest(request); + } catch (IOException e) { + throw new RuntimeException(e); + } + try { + requests.handleRequests(mockRequest); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + private static class MockRequest implements NetworkRequest { + + private final InputStream stream; + + /** + * Constructs a {@link MockRequest}. + * @param stream the {@link InputStream} that will be returned on a call to {@link #getInputStream()}. + */ + private MockRequest(InputStream stream) { + this.stream = stream; + } + + /** + * Constructs a {@link MockRequest} from {@code request}. + * @param request the {@link RequestOrResponse} to construct the {@link MockRequest} for. + * @return an instance of {@link MockRequest} that represents {@code request}. + * @throws IOException + */ + static MockRequest fromRequest(RequestOrResponse request) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate((int) request.sizeInBytes()); + request.writeTo(new ByteBufferChannel(buffer)); + request.release(); + buffer.flip(); + // read length (to bring it to a state where AmbryRequests can handle it). + buffer.getLong(); + return new MockRequest(new ByteBufferDataInputStream(buffer)); + } + + @Override + public InputStream getInputStream() { + return stream; + } + + @Override + public long getStartTimeInMs() { + return 0; + } + } + /** * This method is expected to be called in the exit path as long as the AmbryServer instance construction was * successful. This is expected to be called even if {@link #startup()} did not succeed. diff --git a/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java b/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java index d7683a1f09..97299f9516 100644 --- a/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java +++ b/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java @@ -227,10 +227,11 @@ public AmbryServerRequestsTest(boolean validateRequestOnStoreState, boolean enab Mockito.when(mockDelegate.unmarkStopped(anyList())).thenReturn(true); } -// @Parameterized.Parameters -// public static List data() { + @Parameterized.Parameters + public static List data() { // return Arrays.asList(new Object[][]{{false, false}, {true, false}, {false, true}, {true, true}}); -// } + return Arrays.asList(new Object[][]{{false, false}}); + } private static Properties createProperties(boolean validateRequestOnStoreState, boolean handleUndeleteRequestEnabled) { @@ -1532,16 +1533,6 @@ null, new DiskIOScheduler(null), StoreTestUtils.DEFAULT_DISK_SPACE_ALLOCATOR, datanode, null); } - @Test - public void foo() throws IOException, InterruptedException { - RequestOrResponse get_metadata_request = new com.github.ambry.protocol.FileCopyGetMetaDataRequest( - (short) 0, 0, "", new MockPartitionId(), "hostName"); -// Response response = sendRequestGetResponse(get_metadata_request, ServerErrorCode.No_Error); - - NetworkRequest mockRequest = MockRequest.fromRequest(get_metadata_request); - ambryRequests.handleFileMetaDataRequest(mockRequest); - } - /** * Performs the AdminRequest and Response as well as checking if the content is a json and expected * @param partitionId necessary to fulfill the {@link AdminRequest} @@ -1655,6 +1646,16 @@ public void healthCheckTest() throws InterruptedException, StoreException, IOExc setPropertyToAmbryRequests(currentProperties, "disk.manager.disk.healthcheck.enabled", "false"); } + @Test + public void foo() throws IOException, InterruptedException { + List partitionIds = clusterMap.getWritablePartitionIds(DEFAULT_PARTITION_CLASS); + + RequestOrResponse get_metadata_request = new com.github.ambry.protocol.FileCopyGetMetaDataRequest( + (short) 1, 0, "", partitionIds.get(0), "hostName"); + + sendRequestGetResponse(get_metadata_request, ServerErrorCode.No_Error); + } + // helpers // general diff --git a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java index 494952f7d9..74292fec2c 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java +++ b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java @@ -1320,23 +1320,27 @@ public void shutdown() throws StoreException { shutdown(false); } - List printAndReturnFiles() { + /** + * Gets the log segment metadata files from in-memory data structures + * This method returns List of LogSegmentFiles along with its IndexFiles, BloomFilterFiles + */ + List getLogSegmentMetadataFiles(boolean includeActiveLogSegment) { List result = new ArrayList<>(); - List sealedLogsAndMetaDataFiles = getLogSegments(true); + List sealedLogsAndMetaDataFiles = getLogSegments(includeActiveLogSegment); if (null != sealedLogsAndMetaDataFiles) { for (FileInfo E : sealedLogsAndMetaDataFiles) { logger.info("[Dw] LS file: {} size: {}", E.getFileName(), E.getFileSize()); - List allIndexSegmentsForLogSegment = getAllIndexSegmentsForLogSegment( - dataDir, LogSegmentName.fromFilename(E.getFileName() + LogSegmentName.SUFFIX)); + LogSegmentName logSegmentName = LogSegmentName.fromFilename(E.getFileName() + LogSegmentName.SUFFIX); + + List allIndexSegmentsForLogSegment = getAllIndexSegmentsForLogSegment(dataDir, logSegmentName); if (null != allIndexSegmentsForLogSegment) { for (FileInfo is : allIndexSegmentsForLogSegment) { logger.info("[Dw] IS file: {} size: {}", is.getFileName(), is.getFileSize()); } } - List bloomFiltersForLogSegment = getAllBloomFiltersForLogSegment( - dataDir, LogSegmentName.fromFilename(E.getFileName() + LogSegmentName.SUFFIX)); + List bloomFiltersForLogSegment = getAllBloomFiltersForLogSegment(dataDir, logSegmentName); if (null != bloomFiltersForLogSegment) { for (FileInfo bf : bloomFiltersForLogSegment) { logger.info("[Dw] BF file: {} size: {}", bf.getFileName(), bf.getFileSize()); @@ -1348,20 +1352,31 @@ List printAndReturnFiles() { return result; } + /** + * Get all log segments in the store. + * Param includeActiveLogSegment is used to determine if the active log segment should be included in the result. + */ List getLogSegments(boolean includeActiveLogSegment){ return log.getAllLogSegmentNames().stream() .filter(segment -> includeActiveLogSegment || !segment.equals(log.getActiveSegment().getName())) + .filter(segment -> !segment.isSingleSegment()) .map(segment -> log.getSegment(segment)) .map(segment -> new FileInfo(segment.getName().toString(), segment.getView().getFirst().length())) .collect(Collectors.toList()); } + /** + * Get all index segments for a log segment. + */ List getAllIndexSegmentsForLogSegment(String dataDir, LogSegmentName logSegmentName){ return Arrays.stream(PersistentIndex.getIndexSegmentFilesForLogSegment(dataDir, logSegmentName)) .map(file -> new FileInfo(file.getName(), file.length())) .collect(Collectors.toList()); } + /** + * Get all bloom filter files for a log segment. + */ List getAllBloomFiltersForLogSegment(String dataDir, LogSegmentName logSegmentName){ return Arrays.stream(PersistentIndex.getBloomFilterFiles(dataDir, logSegmentName)) .map(file -> new FileInfo(file.getName(), file.length())) diff --git a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java index 422dffa2ea..5a6459bbd4 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java @@ -836,13 +836,13 @@ public List getFilesForPattern(Pattern pattern) throws IOException { /** * Gets the log segment metadata files from in-memory data structures - * This method returns List of LogSegment File along with its IndexFiles, BloomFilterFiles + * This method returns List of LogSegmentFiles along with its IndexFiles, BloomFilterFiles */ - public List getLogSegmentMetadataFiles() { - return stores.values().stream() - .map(BlobStore::printAndReturnFiles) - .flatMap(List::stream) - .collect(Collectors.toList()); + List getLogSegmentMetadataFiles(PartitionId partitionId, boolean includeActiveLogSegment) { + if (!stores.containsKey(partitionId)) { + throw new IllegalArgumentException("BlobStore for partition " + partitionId + " is not found on disk " + disk); + } + return stores.get(partitionId).getLogSegmentMetadataFiles(includeActiveLogSegment); } /** diff --git a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java index 5fd2e0bcee..3370f29e25 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java @@ -175,13 +175,13 @@ public StorageManager(StoreConfig storeConfig, DiskManagerConfig diskManagerConf /** * Gets the log segment metadata files from in-memory data structures - * This method returns List of LogSegment File along with its IndexFiles, BloomFilterFiles + * This method returns List of LogSegmentFiles along with its IndexFiles, BloomFilterFiles */ - public List getLogSegmentMetadataFiles() { - return partitionToDiskManager.values().stream() - .map(DiskManager::getLogSegmentMetadataFiles) - .flatMap(List::stream) - .collect(Collectors.toList()); + public List getLogSegmentMetadataFiles(PartitionId partitionId, boolean includeActiveLogSegment) { + if (!partitionToDiskManager.containsKey(partitionId)) { + throw new IllegalArgumentException("DiskManager not found for partition " + partitionId); + } + return partitionToDiskManager.get(partitionId).getLogSegmentMetadataFiles(partitionId, includeActiveLogSegment); } /** From 2a23d33d215727dbc199f26b98fbf6781212d4cd Mon Sep 17 00:00:00 2001 From: DevenAhluwalia Date: Fri, 17 Jan 2025 18:41:43 +0530 Subject: [PATCH 4/6] WIP --- .../network/ServerRequestResponseHelper.java | 8 ++++++++ .../protocol/FileCopyGetMetaDataRequest.java | 9 ++++++--- .../protocol/FileCopyGetMetaDataResponse.java | 9 ++++++++- .../java/com/github/ambry/protocol/LogInfo.java | 4 ++-- .../com/github/ambry/server/AmbryRequests.java | 15 ++++++++------- .../ambry/server/AmbryServerRequestsTest.java | 4 ++-- 6 files changed, 34 insertions(+), 15 deletions(-) diff --git a/ambry-network/src/main/java/com/github/ambry/network/ServerRequestResponseHelper.java b/ambry-network/src/main/java/com/github/ambry/network/ServerRequestResponseHelper.java index c67c1eee25..d26a2004fd 100644 --- a/ambry-network/src/main/java/com/github/ambry/network/ServerRequestResponseHelper.java +++ b/ambry-network/src/main/java/com/github/ambry/network/ServerRequestResponseHelper.java @@ -19,6 +19,8 @@ import com.github.ambry.protocol.AdminResponse; import com.github.ambry.protocol.DeleteRequest; import com.github.ambry.protocol.DeleteResponse; +import com.github.ambry.protocol.FileCopyGetMetaDataRequest; +import com.github.ambry.protocol.FileCopyGetMetaDataResponse; import com.github.ambry.protocol.GetRequest; import com.github.ambry.protocol.GetResponse; import com.github.ambry.protocol.PurgeRequest; @@ -96,6 +98,9 @@ public RequestOrResponse getDecodedRequest(NetworkRequest networkRequest) throws case PurgeRequest: request = PurgeRequest.readFrom(dis, clusterMap); break; + case FileCopyGetMetaDataRequest: + request = FileCopyGetMetaDataRequest.readFrom(dis, clusterMap); + break; default: throw new UnsupportedOperationException("Request type not supported"); } @@ -153,6 +158,9 @@ public Response createErrorResponse(RequestOrResponse request, ServerErrorCode s case PurgeRequest: response = new PurgeResponse(request.getCorrelationId(), request.getClientId(), serverErrorCode); break; + case FileCopyGetMetaDataRequest: + response = new FileCopyGetMetaDataResponse(request.getCorrelationId(), request.getClientId(), serverErrorCode); + break; default: throw new UnsupportedOperationException("Request type not supported"); } diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java index cb5fbbb0a3..9319ab019e 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java @@ -25,7 +25,7 @@ public class FileCopyGetMetaDataRequest extends RequestOrResponse{ private final PartitionId partitionId; private final String hostName; - private static final short File_Metadata_Request_Version_V1 = 1; + public static final short File_Metadata_Request_Version_V1 = 1; private static final int HostName_Field_Size_In_Bytes = 4; public FileCopyGetMetaDataRequest( @@ -65,8 +65,11 @@ public static FileCopyGetMetaDataRequest readFrom( public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("FileMetaDataRequest[").append("PartitionId=").append(partitionId).append(", HostName=").append(hostName) - .append("]"); + sb.append("FileCopyGetMetaDataRequest[") + .append("PartitionId=") + .append(partitionId.getId()).append(", HostName=") + .append(hostName) + .append("]"); return sb.toString(); } diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java index 3b0cd3311d..e5689a8ef0 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java @@ -27,16 +27,23 @@ public class FileCopyGetMetaDataResponse extends Response { private final int numberOfLogfiles; private final List logInfos; - private static final short File_Copy_Protocol_Metadata_Response_Version_V1 = 1; + public static final short File_Copy_Protocol_Metadata_Response_Version_V1 = 1; + + static short CURRENT_VERSION = File_Copy_Protocol_Metadata_Response_Version_V1; public FileCopyGetMetaDataResponse(short versionId, int correlationId, String clientId, int numberOfLogfiles, List logInfos, ServerErrorCode errorCode) { super(RequestOrResponseType.FileCopyGetMetaDataResponse, versionId, correlationId, clientId, errorCode); + validateVersion(versionId); this.numberOfLogfiles = numberOfLogfiles; this.logInfos = logInfos; } + public FileCopyGetMetaDataResponse(int correlationId, String clientId, ServerErrorCode serverErrorCode) { + this(CURRENT_VERSION, correlationId, clientId, 0, new ArrayList<>(), serverErrorCode); + } + public static FileCopyGetMetaDataResponse readFrom( @Nonnull DataInputStream stream) throws IOException { RequestOrResponseType type = RequestOrResponseType.values()[stream.readShort()]; diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java index 2bb76c0e9c..b1fc05a02f 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java @@ -116,10 +116,10 @@ public void writeTo(@Nonnull ByteBuf buf){ public String toString(){ StringBuilder sb = new StringBuilder(); sb.append("LogInfo["); - sb.append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes).append(","); + sb.append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes); if(!indexFiles.isEmpty()) { - sb.append(" IndexFiles=["); + sb.append(", IndexFiles=["); for (FileInfo fileInfo : indexFiles) { sb.append(fileInfo.toString()); } diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java index 6edd57f5f6..3607245a71 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java @@ -1693,7 +1693,8 @@ void handleFileMetaDataRequest(NetworkRequest request) throws InterruptedExcepti List logInfos = convertStoreToProtocolLogInfo(logSegments); - FileCopyGetMetaDataResponse response = new FileCopyGetMetaDataResponse((short)1, 0, "", + FileCopyGetMetaDataResponse response = new FileCopyGetMetaDataResponse( + FileCopyGetMetaDataResponse.File_Copy_Protocol_Metadata_Response_Version_V1, 0, "", logSegments.size(), logInfos, ServerErrorCode.No_Error); Histogram dummyHistogram = new Histogram(new Reservoir() { @Override @@ -1711,13 +1712,13 @@ public Snapshot getSnapshot() { } }); - logger.info("[Dw] Api response - " + response); - System.out.println("[Dw] Api response - " + response); + logger.info("Dw: Api response, partition-" + fileCopyGetMetaDataRequest.getPartitionId().getId() + " " + response); + System.out.println("Dw: Api response, partition-" + fileCopyGetMetaDataRequest.getPartitionId().getId() + " " + response); -// requestResponseChannel.sendResponse( -// response, request, -// new ServerNetworkResponseMetrics(dummyHistogram, dummyHistogram, dummyHistogram, -// null, null, 0)); + requestResponseChannel.sendResponse( + response, request, + new ServerNetworkResponseMetrics(dummyHistogram, dummyHistogram, dummyHistogram, + null, null, 0)); } private List convertStoreToProtocolLogInfo(List logSegments) { diff --git a/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java b/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java index 97299f9516..6445edcfdf 100644 --- a/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java +++ b/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java @@ -1650,10 +1650,10 @@ public void healthCheckTest() throws InterruptedException, StoreException, IOExc public void foo() throws IOException, InterruptedException { List partitionIds = clusterMap.getWritablePartitionIds(DEFAULT_PARTITION_CLASS); - RequestOrResponse get_metadata_request = new com.github.ambry.protocol.FileCopyGetMetaDataRequest( + RequestOrResponse request = new com.github.ambry.protocol.FileCopyGetMetaDataRequest( (short) 1, 0, "", partitionIds.get(0), "hostName"); - sendRequestGetResponse(get_metadata_request, ServerErrorCode.No_Error); + sendRequestGetResponse(request, ServerErrorCode.No_Error); } // helpers From 4d89d8abf23f0587c180ab763a3458a04c41a632 Mon Sep 17 00:00:00 2001 From: DevenAhluwalia Date: Tue, 21 Jan 2025 19:13:52 +0530 Subject: [PATCH 5/6] WIP --- .../com/github/ambry/server/StoreManager.java | 9 +++ .../ambry/cloud/CloudStorageManager.java | 6 ++ .../protocol/FileCopyGetMetaDataResponse.java | 8 +- .../github/ambry/server/AmbryRequests.java | 40 ++++++---- .../com/github/ambry/server/AmbryServer.java | 75 ++++++------------- .../ambry/server/AmbryServerRequestsTest.java | 6 +- 6 files changed, 71 insertions(+), 73 deletions(-) diff --git a/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java b/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java index ff7d492945..18b510cb07 100644 --- a/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java +++ b/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java @@ -16,6 +16,7 @@ import com.github.ambry.clustermap.ClusterParticipant; import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.store.LogInfo; import com.github.ambry.store.Store; import com.github.ambry.store.StoreException; import java.io.IOException; @@ -153,4 +154,12 @@ public interface StoreManager { * @throws IOException */ boolean isFilesExistForPattern(PartitionId partitionId, Pattern pattern) throws IOException; + + /** + * Get the list of log segment metadata files for a given partition. + * @param partitionId + * @param includeActiveLogSegment + * @return List of LogSegmentFiles along with its IndexFiles, BloomFilterFiles + */ + public List getLogSegmentMetadataFiles(PartitionId partitionId, boolean includeActiveLogSegment); } diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java index 09c0a786ce..73d5207c2a 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java @@ -22,6 +22,7 @@ import com.github.ambry.config.VerifiableProperties; import com.github.ambry.server.ServerErrorCode; import com.github.ambry.server.StoreManager; +import com.github.ambry.store.LogInfo; import com.github.ambry.store.Store; import java.nio.file.FileStore; import java.util.Collection; @@ -151,6 +152,11 @@ public boolean isFilesExistForPattern(PartitionId partitionId, Pattern allLogSeg throw new UnsupportedOperationException("Method not supported"); } + @Override + public List getLogSegmentMetadataFiles(PartitionId partitionId, boolean includeActiveLogSegment) { + throw new UnsupportedOperationException("Method not supported"); + } + @Override public List setBlobStoreStoppedState(List partitionIds, boolean markStop) { throw new UnsupportedOperationException("Method not supported"); diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java index e5689a8ef0..f11d275c71 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java @@ -82,8 +82,12 @@ public long sizeInBytes() { public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("FileMetaDataResponse[NumberOfLogfiles=").append(numberOfLogfiles).append(", logInfoList").append( - logInfos.toString()).append("]"); + sb + .append("FileCopyGetMetaDataResponse[NumberOfLogfiles=") + .append(numberOfLogfiles) + .append(", logInfoList") + .append(logInfos.toString()) + .append("]"); return sb.toString(); } diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java index 3607245a71..8465602c51 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java @@ -97,7 +97,6 @@ import com.github.ambry.store.Message; import com.github.ambry.store.MessageErrorInfo; import com.github.ambry.store.MessageInfo; -import com.github.ambry.store.StorageManager; import com.github.ambry.store.Store; import com.github.ambry.store.StoreBatchDeleteInfo; import com.github.ambry.store.StoreErrorCodes; @@ -241,7 +240,7 @@ public void handleRequests(NetworkRequest networkRequest) throws InterruptedExce handleReplicateBlobRequest(networkRequest); break; case FileCopyGetMetaDataRequest: - handleFileMetaDataRequest(networkRequest); + handleFileCopyGetMetaDataRequest(networkRequest); break; default: throw new UnsupportedOperationException("Request type not supported"); @@ -1684,18 +1683,32 @@ public void handleUndeleteRequest(NetworkRequest request) throws IOException, In /** * Handler for FileMetadataRequest */ - void handleFileMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException { + void handleFileCopyGetMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException { FileCopyGetMetaDataRequest fileCopyGetMetaDataRequest = FileCopyGetMetaDataRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); - List logSegments = ((StorageManager)storeManager) - .getLogSegmentMetadataFiles(fileCopyGetMetaDataRequest.getPartitionId(), true); - - List logInfos = convertStoreToProtocolLogInfo(logSegments); + List logInfos; + try { + List logSegments = + storeManager.getLogSegmentMetadataFiles(fileCopyGetMetaDataRequest.getPartitionId(), true); + logInfos = convertStoreToProtocolLogInfo(logSegments); + } catch (Exception e) { + logger.error("Error while getting log segment metadata for partition {}", + fileCopyGetMetaDataRequest.getPartitionId().getId(), e); + FileCopyGetMetaDataResponse response = new FileCopyGetMetaDataResponse( + FileCopyGetMetaDataResponse.File_Copy_Protocol_Metadata_Response_Version_V1, + fileCopyGetMetaDataRequest.getCorrelationId(), fileCopyGetMetaDataRequest.getClientId(), + 0, null, ServerErrorCode.Unknown_Error); + requestResponseChannel.sendResponse(response, request, null); + return; + } FileCopyGetMetaDataResponse response = new FileCopyGetMetaDataResponse( - FileCopyGetMetaDataResponse.File_Copy_Protocol_Metadata_Response_Version_V1, 0, "", - logSegments.size(), logInfos, ServerErrorCode.No_Error); + FileCopyGetMetaDataResponse.File_Copy_Protocol_Metadata_Response_Version_V1, + fileCopyGetMetaDataRequest.getCorrelationId(), fileCopyGetMetaDataRequest.getClientId(), + logInfos.size(), logInfos, ServerErrorCode.No_Error); + + // TODO: Add metrics for this operation Histogram dummyHistogram = new Histogram(new Reservoir() { @Override public int size() { @@ -1711,14 +1724,11 @@ public Snapshot getSnapshot() { return null; } }); + ServerNetworkResponseMetrics serverNetworkResponseMetrics = new ServerNetworkResponseMetrics(dummyHistogram, + dummyHistogram, dummyHistogram, null, null, 0); logger.info("Dw: Api response, partition-" + fileCopyGetMetaDataRequest.getPartitionId().getId() + " " + response); - System.out.println("Dw: Api response, partition-" + fileCopyGetMetaDataRequest.getPartitionId().getId() + " " + response); - - requestResponseChannel.sendResponse( - response, request, - new ServerNetworkResponseMetrics(dummyHistogram, dummyHistogram, dummyHistogram, - null, null, 0)); + requestResponseChannel.sendResponse(response, request, serverNetworkResponseMetrics); } private List convertStoreToProtocolLogInfo(List logSegments) { diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java index d25f836121..cacc2b8519 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java @@ -56,6 +56,8 @@ import com.github.ambry.messageformat.BlobStoreHardDelete; import com.github.ambry.messageformat.BlobStoreRecovery; import com.github.ambry.network.BlockingChannelConnectionPool; +import com.github.ambry.network.ChannelOutput; +import com.github.ambry.network.ConnectedChannel; import com.github.ambry.network.ConnectionPool; import com.github.ambry.network.LocalNetworkClientFactory; import com.github.ambry.network.LocalRequestResponseChannel; @@ -75,6 +77,8 @@ import com.github.ambry.network.http2.Http2NetworkClientFactory; import com.github.ambry.network.http2.Http2ServerMetrics; import com.github.ambry.notification.NotificationSystem; +import com.github.ambry.protocol.FileCopyGetMetaDataRequest; +import com.github.ambry.protocol.FileCopyGetMetaDataResponse; import com.github.ambry.protocol.RequestHandlerPool; import com.github.ambry.protocol.RequestOrResponse; import com.github.ambry.repair.RepairRequestsDb; @@ -472,7 +476,6 @@ public void startup() throws InstantiationException { for (ClusterParticipant participant : clusterParticipants) { participant.participate(ambryStatsReports, accountStatsMySqlStore, accountServiceCallback); } - invokeGetMetadataApi(); } else { throw new IllegalArgumentException("Unknown server execution mode"); } @@ -491,66 +494,30 @@ public void startup() throws InstantiationException { } } - private void invokeGetMetadataApi() { + private void testGetMetadataApi() { List partitionIds = clusterMap.getAllPartitionIds(null); + FileCopyGetMetaDataRequest request = new FileCopyGetMetaDataRequest( + FileCopyGetMetaDataRequest.File_Metadata_Request_Version_V1, 0, "", partitionIds.get(0), "hostName"); - partitionIds.forEach(partitionId -> { - RequestOrResponse request = new com.github.ambry.protocol.FileCopyGetMetaDataRequest( - (short) 1, 0, "", partitionId, "hostName"); + partitionIds.get(0).getReplicaIds().forEach(replicaId -> { + logger.info("Dw: partitionId: {}, replicaId: {}", partitionIds.get(0), replicaId); - NetworkRequest mockRequest = null; - try { - mockRequest = MockRequest.fromRequest(request); - } catch (IOException e) { - throw new RuntimeException(e); - } - try { - requests.handleRequests(mockRequest); - } catch (InterruptedException e) { - throw new RuntimeException(e); + if (replicaId.getDataNodeId().getHostname().equals("ltx1-app3645.stg.linkedin.com")) { + DataNodeId targetDataNodeId = replicaId.getDataNodeId(); + try { + logger.info("Dw: Request: {}", request); + ConnectedChannel connectedChannel = + connectionPool.checkOutConnection(targetDataNodeId.getHostname(), targetDataNodeId.getPortToConnectTo(), 40); + ChannelOutput channelOutput = connectedChannel.sendAndReceive(request); + FileCopyGetMetaDataResponse response = FileCopyGetMetaDataResponse.readFrom(channelOutput.getInputStream()); + logger.info("Dw: Response: {}", response); + } catch (Exception e) { + logger.error("Dw: Error while sending request to " + targetDataNodeId, e); + } } }); } - private static class MockRequest implements NetworkRequest { - - private final InputStream stream; - - /** - * Constructs a {@link MockRequest}. - * @param stream the {@link InputStream} that will be returned on a call to {@link #getInputStream()}. - */ - private MockRequest(InputStream stream) { - this.stream = stream; - } - - /** - * Constructs a {@link MockRequest} from {@code request}. - * @param request the {@link RequestOrResponse} to construct the {@link MockRequest} for. - * @return an instance of {@link MockRequest} that represents {@code request}. - * @throws IOException - */ - static MockRequest fromRequest(RequestOrResponse request) throws IOException { - ByteBuffer buffer = ByteBuffer.allocate((int) request.sizeInBytes()); - request.writeTo(new ByteBufferChannel(buffer)); - request.release(); - buffer.flip(); - // read length (to bring it to a state where AmbryRequests can handle it). - buffer.getLong(); - return new MockRequest(new ByteBufferDataInputStream(buffer)); - } - - @Override - public InputStream getInputStream() { - return stream; - } - - @Override - public long getStartTimeInMs() { - return 0; - } - } - /** * This method is expected to be called in the exit path as long as the AmbryServer instance construction was * successful. This is expected to be called even if {@link #startup()} did not succeed. diff --git a/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java b/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java index 6445edcfdf..f5c5413fee 100644 --- a/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java +++ b/ambry-server/src/test/java/com/github/ambry/server/AmbryServerRequestsTest.java @@ -64,6 +64,7 @@ import com.github.ambry.protocol.CatchupStatusAdminResponse; import com.github.ambry.protocol.DeleteRequest; import com.github.ambry.protocol.DeleteResponse; +import com.github.ambry.protocol.FileCopyGetMetaDataRequest; import com.github.ambry.protocol.GetOption; import com.github.ambry.protocol.GetRequest; import com.github.ambry.protocol.GetResponse; @@ -1647,11 +1648,12 @@ public void healthCheckTest() throws InterruptedException, StoreException, IOExc } @Test - public void foo() throws IOException, InterruptedException { + public void fileCopyGetMetaDataRequestTest() throws IOException, InterruptedException { List partitionIds = clusterMap.getWritablePartitionIds(DEFAULT_PARTITION_CLASS); RequestOrResponse request = new com.github.ambry.protocol.FileCopyGetMetaDataRequest( - (short) 1, 0, "", partitionIds.get(0), "hostName"); + FileCopyGetMetaDataRequest.File_Metadata_Request_Version_V1, 0, "", + partitionIds.get(0), "hostName"); sendRequestGetResponse(request, ServerErrorCode.No_Error); } From 544e5e84fb7639134d9c6ebc2322a95cadb1b3a4 Mon Sep 17 00:00:00 2001 From: DevenAhluwalia Date: Sat, 8 Feb 2025 22:17:25 +0530 Subject: [PATCH 6/6] WIP --- .../java/com/github/ambry/store/FileInfo.java | 14 ++- .../java/com/github/ambry/store/LogInfo.java | 14 ++- .../github/ambry/commons/ServerMetrics.java | 25 ++++++ .../protocol/FileCopyGetMetaDataRequest.java | 11 ++- .../protocol/FileCopyGetMetaDataResponse.java | 6 +- .../com/github/ambry/protocol/FileInfo.java | 8 +- .../github/ambry/protocol/RequestVisitor.java | 6 ++ .../github/ambry/server/AmbryRequests.java | 86 ++++++++++--------- .../com/github/ambry/store/BlobStore.java | 14 +-- 9 files changed, 122 insertions(+), 62 deletions(-) diff --git a/ambry-api/src/main/java/com/github/ambry/store/FileInfo.java b/ambry-api/src/main/java/com/github/ambry/store/FileInfo.java index 700c99faff..3f30e2f4e7 100644 --- a/ambry-api/src/main/java/com/github/ambry/store/FileInfo.java +++ b/ambry-api/src/main/java/com/github/ambry/store/FileInfo.java @@ -18,7 +18,9 @@ public class FileInfo { private final String fileName; private final long fileSize; - public FileInfo(String fileName, Long fileSize) { + public FileInfo( + String fileName, + Long fileSize) { this.fileName = fileName; this.fileSize = fileSize; } @@ -30,4 +32,12 @@ public String getFileName() { public Long getFileSize() { return fileSize; } -} + + @Override + public String toString() { + return "FileInfo{" + + "fileName='" + fileName + '\'' + + ", fileSize=" + fileSize + + '}'; + } +} \ No newline at end of file diff --git a/ambry-api/src/main/java/com/github/ambry/store/LogInfo.java b/ambry-api/src/main/java/com/github/ambry/store/LogInfo.java index 6a2da09e30..95be3c6678 100644 --- a/ambry-api/src/main/java/com/github/ambry/store/LogInfo.java +++ b/ambry-api/src/main/java/com/github/ambry/store/LogInfo.java @@ -22,7 +22,10 @@ public class LogInfo { private final List indexSegments; private final List bloomFilters; - public LogInfo(FileInfo logSegment, List indexSegments, List bloomFilters) { + public LogInfo( + FileInfo logSegment, + List indexSegments, + List bloomFilters) { this.logSegment = logSegment; this.indexSegments = indexSegments; this.bloomFilters = bloomFilters; @@ -42,4 +45,13 @@ public List getIndexSegments() { public List getBloomFilters() { return Collections.unmodifiableList(bloomFilters); } + + @Override + public String toString() { + return "LogInfo{" + + "logSegment=" + logSegment + + ", indexSegments=" + indexSegments + + ", bloomFilters=" + bloomFilters + + '}'; + } } diff --git a/ambry-commons/src/main/java/com/github/ambry/commons/ServerMetrics.java b/ambry-commons/src/main/java/com/github/ambry/commons/ServerMetrics.java index 91786e2d45..ad5eb93752 100644 --- a/ambry-commons/src/main/java/com/github/ambry/commons/ServerMetrics.java +++ b/ambry-commons/src/main/java/com/github/ambry/commons/ServerMetrics.java @@ -101,6 +101,15 @@ public class ServerMetrics { public final Histogram deleteBlobProcessingTimeInMs; public final Histogram deleteBlobResponseQueueTimeInMs; + public final Histogram fileCopyGetMetadataRequestQueueTimeInMs; + public final Histogram fileCopyGetMetadataProcessingTimeInMs; + public final Histogram fileCopyGetMetadataResponseQueueTimeInMs; + public final Histogram fileCopyGetMetadataSendTimeInMs; + public final Histogram fileCopyGetMetadataTotalTimeInMs; + public final Meter fileCopyGetMetadataRequestRate; + public final Meter fileCopyGetMetadataDroppedRate; + + public final Histogram batchDeleteBlobRequestQueueTimeInMs; public final Histogram batchDeleteBlobProcessingTimeInMs; public final Histogram batchDeleteBlobResponseQueueTimeInMs; @@ -419,6 +428,22 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se deleteBlobProcessingTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobProcessingTime")); deleteBlobResponseQueueTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobResponseQueueTime")); + + fileCopyGetMetadataRequestQueueTimeInMs = + registry.histogram(MetricRegistry.name(requestClass, "FileCopyGetMetadataRequestQueueTimeInMs")); + fileCopyGetMetadataProcessingTimeInMs = + registry.histogram(MetricRegistry.name(requestClass, "FileCopyGetMetadataProcessingTimeInMs")); + fileCopyGetMetadataResponseQueueTimeInMs = + registry.histogram(MetricRegistry.name(requestClass, "FileCopyGetMetadataResponseQueueTimeInMs")); + fileCopyGetMetadataSendTimeInMs = + registry.histogram(MetricRegistry.name(requestClass, "FileCopyGetMetadataSendTimeInMs")); + fileCopyGetMetadataTotalTimeInMs = + registry.histogram(MetricRegistry.name(requestClass, "FileCopyGetMetadataTotalTimeInMs")); + fileCopyGetMetadataRequestRate = + registry.meter(MetricRegistry.name(requestClass, "FileCopyGetMetadataRequestRate")); + fileCopyGetMetadataDroppedRate = + registry.meter(MetricRegistry.name(requestClass, "FileCopyGetMetadataDroppedRate")); + batchDeleteBlobRequestQueueTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "BatchDeleteBlobRequestQueueTimeInMs")); batchDeleteBlobProcessingTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "BatchDeleteBlobProcessingTimeInMs")); diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java index 9319ab019e..1234762f8c 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java @@ -22,7 +22,7 @@ import javax.annotation.Nonnull; -public class FileCopyGetMetaDataRequest extends RequestOrResponse{ +public class FileCopyGetMetaDataRequest extends RequestOrResponse { private final PartitionId partitionId; private final String hostName; public static final short File_Metadata_Request_Version_V1 = 1; @@ -56,10 +56,12 @@ public static FileCopyGetMetaDataRequest readFrom( @Nonnull ClusterMap clusterMap) throws IOException { short versionId = stream.readShort(); validateVersion(versionId); + int correlationId = stream.readInt(); String clientId = Utils.readIntString(stream); String hostName = Utils.readIntString(stream); PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream); + return new FileCopyGetMetaDataRequest(versionId, correlationId, clientId, partitionId, hostName); } @@ -73,10 +75,17 @@ public String toString() { return sb.toString(); } + @Override + public void accept(RequestVisitor visitor) { + visitor.visit(this); + } + + @Override public long sizeInBytes() { return super.sizeInBytes() + HostName_Field_Size_In_Bytes + hostName.length() + partitionId.getBytes().length; } + @Override protected void prepareBuffer() { super.prepareBuffer(); Utils.serializeString(bufferToSend, hostName, Charset.defaultCharset()); diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java index f11d275c71..4747af4310 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java @@ -32,7 +32,7 @@ public class FileCopyGetMetaDataResponse extends Response { static short CURRENT_VERSION = File_Copy_Protocol_Metadata_Response_Version_V1; public FileCopyGetMetaDataResponse(short versionId, int correlationId, String clientId, int numberOfLogfiles, - List logInfos, ServerErrorCode errorCode) { + @Nonnull List logInfos, @Nonnull ServerErrorCode errorCode) { super(RequestOrResponseType.FileCopyGetMetaDataResponse, versionId, correlationId, clientId, errorCode); validateVersion(versionId); @@ -60,7 +60,6 @@ public static FileCopyGetMetaDataResponse readFrom( //Setting the number of logfiles to 0 as there are no logfiles to be read. return new FileCopyGetMetaDataResponse(versionId, correlationId, clientId, 0, new ArrayList<>(), errorCode); } - int numberOfLogfiles = stream.readInt(); List logInfos = new ArrayList<>(); for (int i = 0; i < numberOfLogfiles; i++) { @@ -68,6 +67,8 @@ public static FileCopyGetMetaDataResponse readFrom( } return new FileCopyGetMetaDataResponse(versionId, correlationId, clientId, numberOfLogfiles, logInfos, errorCode); } + + @Override protected void prepareBuffer() { super.prepareBuffer(); bufferToSend.writeInt(numberOfLogfiles); @@ -76,6 +77,7 @@ protected void prepareBuffer() { } } + @Override public long sizeInBytes() { return super.sizeInBytes() + Integer.BYTES + logInfos.stream().mapToLong(LogInfo::sizeInBytes).sum(); } diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java index e6e1991591..dbe612f9f1 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java @@ -34,7 +34,7 @@ public class FileInfo { public FileInfo( @Nonnull String fileName, - @Nonnull long fileSize) { + long fileSize) { this.fileName = fileName; this.fileSizeInBytes = fileSize; } @@ -56,8 +56,10 @@ public void writeTo(@Nonnull ByteBuf buf) { public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("FileInfo[").append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes) - .append("]"); + sb.append("FileInfo[") + .append("FileName=").append(fileName) + .append(", FileSizeInBytes=").append(fileSizeInBytes) + .append("]"); return sb.toString(); } diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestVisitor.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestVisitor.java index bd5f067b87..1d01e0bf63 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestVisitor.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestVisitor.java @@ -48,6 +48,12 @@ public interface RequestVisitor { */ void visit(BatchDeleteRequest deleteRequest); + /** + * Performs any actions related to Batch Delete request. + * @param fileCopyGetMetaDataRequest to visit. + */ + void visit(FileCopyGetMetaDataRequest fileCopyGetMetaDataRequest); + /** * Performs any actions related to Un-delete request. * @param undeleteRequest to visit. diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java index 8465602c51..ecb69844c7 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java @@ -1684,65 +1684,57 @@ public void handleUndeleteRequest(NetworkRequest request) throws IOException, In * Handler for FileMetadataRequest */ void handleFileCopyGetMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException { + long requestQueueTime = SystemTime.getInstance().milliseconds() - request.getStartTimeInMs(); + long totalTimeSpent = requestQueueTime; + long startTime = SystemTime.getInstance().milliseconds(); + FileCopyGetMetaDataRequest fileCopyGetMetaDataRequest = FileCopyGetMetaDataRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); - + FileCopyGetMetaDataResponse response = null; List logInfos; try { - List logSegments = - storeManager.getLogSegmentMetadataFiles(fileCopyGetMetaDataRequest.getPartitionId(), true); - + List logSegments = storeManager.getLogSegmentMetadataFiles( + fileCopyGetMetaDataRequest.getPartitionId(), true); logInfos = convertStoreToProtocolLogInfo(logSegments); + + response = new FileCopyGetMetaDataResponse( + FileCopyGetMetaDataResponse.File_Copy_Protocol_Metadata_Response_Version_V1, + fileCopyGetMetaDataRequest.getCorrelationId(), fileCopyGetMetaDataRequest.getClientId(), + logInfos.size(), logInfos, ServerErrorCode.No_Error); } catch (Exception e) { logger.error("Error while getting log segment metadata for partition {}", fileCopyGetMetaDataRequest.getPartitionId().getId(), e); - FileCopyGetMetaDataResponse response = new FileCopyGetMetaDataResponse( + + response = new FileCopyGetMetaDataResponse( FileCopyGetMetaDataResponse.File_Copy_Protocol_Metadata_Response_Version_V1, fileCopyGetMetaDataRequest.getCorrelationId(), fileCopyGetMetaDataRequest.getClientId(), - 0, null, ServerErrorCode.Unknown_Error); - requestResponseChannel.sendResponse(response, request, null); - return; - } - FileCopyGetMetaDataResponse response = new FileCopyGetMetaDataResponse( - FileCopyGetMetaDataResponse.File_Copy_Protocol_Metadata_Response_Version_V1, - fileCopyGetMetaDataRequest.getCorrelationId(), fileCopyGetMetaDataRequest.getClientId(), - logInfos.size(), logInfos, ServerErrorCode.No_Error); - - // TODO: Add metrics for this operation - Histogram dummyHistogram = new Histogram(new Reservoir() { - @Override - public int size() { - return 0; - } - - @Override - public void update(long value) { - } - - @Override - public Snapshot getSnapshot() { - return null; - } - }); - ServerNetworkResponseMetrics serverNetworkResponseMetrics = new ServerNetworkResponseMetrics(dummyHistogram, - dummyHistogram, dummyHistogram, null, null, 0); + 0, new ArrayList<>(), ServerErrorCode.Unknown_Error); + } finally { + long processingTime = SystemTime.getInstance().milliseconds() - startTime; + totalTimeSpent += processingTime; + publicAccessLogger.info("{} {} processingTime {}", fileCopyGetMetaDataRequest, response, processingTime); - logger.info("Dw: Api response, partition-" + fileCopyGetMetaDataRequest.getPartitionId().getId() + " " + response); - requestResponseChannel.sendResponse(response, request, serverNetworkResponseMetrics); + // Update request metrics. + RequestMetricsUpdater metricsUpdater = new RequestMetricsUpdater( + requestQueueTime, processingTime, 0, 0, false); + fileCopyGetMetaDataRequest.accept(metricsUpdater); + } + requestResponseChannel.sendResponse(response, request, + new ServerNetworkResponseMetrics(metrics.fileCopyGetMetadataResponseQueueTimeInMs, + metrics.fileCopyGetMetadataSendTimeInMs, metrics.fileCopyGetMetadataTotalTimeInMs, + null, null, totalTimeSpent)); } private List convertStoreToProtocolLogInfo(List logSegments) { List logInfos = new ArrayList<>(); for (com.github.ambry.store.LogInfo logSegment : logSegments) { List indexSegments = new ArrayList<>(); - logSegment.getIndexSegments().forEach(indexSegment -> { - indexSegments.add(new com.github.ambry.protocol.FileInfo(indexSegment.getFileName(), indexSegment.getFileSize())); - }); + logSegment.getIndexSegments().forEach(indexSegment -> + indexSegments.add(new com.github.ambry.protocol.FileInfo(indexSegment.getFileName(), indexSegment.getFileSize()))); List bloomFilters = new ArrayList<>(); - logSegment.getBloomFilters().forEach(bloomFilter -> { - bloomFilters.add(new com.github.ambry.protocol.FileInfo(bloomFilter.getFileName(), bloomFilter.getFileSize())); - }); + logSegment.getBloomFilters().forEach(bloomFilter -> + bloomFilters.add(new com.github.ambry.protocol.FileInfo(bloomFilter.getFileName(), bloomFilter.getFileSize()))); logInfos.add(new LogInfo( logSegment.getLogSegment().getFileName(), logSegment.getLogSegment().getFileSize(), @@ -2128,6 +2120,20 @@ public void visit(BatchDeleteRequest deleteRequest) { } + @Override + public void visit(FileCopyGetMetaDataRequest fileCopyGetMetaDataRequest) { + metrics.fileCopyGetMetadataRequestQueueTimeInMs.update(requestQueueTime); + metrics.fileCopyGetMetadataRequestRate.mark(); + metrics.fileCopyGetMetadataProcessingTimeInMs.update(requestProcessingTime); + responseQueueTimeHistogram = metrics.fileCopyGetMetadataResponseQueueTimeInMs; + responseSendTimeHistogram = metrics.fileCopyGetMetadataSendTimeInMs; + requestTotalTimeHistogram = metrics.fileCopyGetMetadataTotalTimeInMs; + if (isRequestDropped) { + metrics.fileCopyGetMetadataDroppedRate.mark(); + metrics.totalRequestDroppedRate.mark(); + } + } + @Override public void visit(UndeleteRequest undeleteRequest) { metrics.undeleteBlobRequestQueueTimeInMs.update(requestQueueTime); diff --git a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java index 74292fec2c..ece2deb896 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java +++ b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java @@ -1330,22 +1330,10 @@ List getLogSegmentMetadataFiles(boolean includeActiveLogSegment) { List sealedLogsAndMetaDataFiles = getLogSegments(includeActiveLogSegment); if (null != sealedLogsAndMetaDataFiles) { for (FileInfo E : sealedLogsAndMetaDataFiles) { - logger.info("[Dw] LS file: {} size: {}", E.getFileName(), E.getFileSize()); - LogSegmentName logSegmentName = LogSegmentName.fromFilename(E.getFileName() + LogSegmentName.SUFFIX); - List allIndexSegmentsForLogSegment = getAllIndexSegmentsForLogSegment(dataDir, logSegmentName); - if (null != allIndexSegmentsForLogSegment) { - for (FileInfo is : allIndexSegmentsForLogSegment) { - logger.info("[Dw] IS file: {} size: {}", is.getFileName(), is.getFileSize()); - } - } List bloomFiltersForLogSegment = getAllBloomFiltersForLogSegment(dataDir, logSegmentName); - if (null != bloomFiltersForLogSegment) { - for (FileInfo bf : bloomFiltersForLogSegment) { - logger.info("[Dw] BF file: {} size: {}", bf.getFileName(), bf.getFileSize()); - } - } + result.add(new LogInfo(E, allIndexSegmentsForLogSegment, bloomFiltersForLogSegment)); } }