Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
DevenAhluwalia committed Feb 8, 2025
1 parent 2696aa9 commit 544e5e8
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 62 deletions.
14 changes: 12 additions & 2 deletions ambry-api/src/main/java/com/github/ambry/store/FileInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ public class FileInfo {
private final String fileName;
private final long fileSize;

public FileInfo(String fileName, Long fileSize) {
public FileInfo(
String fileName,
Long fileSize) {
this.fileName = fileName;
this.fileSize = fileSize;
}
Expand All @@ -30,4 +32,12 @@ public String getFileName() {
public Long getFileSize() {
return fileSize;
}
}

@Override
public String toString() {
return "FileInfo{" +
"fileName='" + fileName + '\'' +
", fileSize=" + fileSize +
'}';
}
}
14 changes: 13 additions & 1 deletion ambry-api/src/main/java/com/github/ambry/store/LogInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ public class LogInfo {
private final List<FileInfo> indexSegments;
private final List<FileInfo> bloomFilters;

public LogInfo(FileInfo logSegment, List<FileInfo> indexSegments, List<FileInfo> bloomFilters) {
public LogInfo(
FileInfo logSegment,
List<FileInfo> indexSegments,
List<FileInfo> bloomFilters) {
this.logSegment = logSegment;
this.indexSegments = indexSegments;
this.bloomFilters = bloomFilters;
Expand All @@ -42,4 +45,13 @@ public List<FileInfo> getIndexSegments() {
public List<FileInfo> getBloomFilters() {
return Collections.unmodifiableList(bloomFilters);
}

@Override
public String toString() {
return "LogInfo{" +
"logSegment=" + logSegment +
", indexSegments=" + indexSegments +
", bloomFilters=" + bloomFilters +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ public class ServerMetrics {
public final Histogram deleteBlobProcessingTimeInMs;
public final Histogram deleteBlobResponseQueueTimeInMs;

public final Histogram fileCopyGetMetadataRequestQueueTimeInMs;
public final Histogram fileCopyGetMetadataProcessingTimeInMs;
public final Histogram fileCopyGetMetadataResponseQueueTimeInMs;
public final Histogram fileCopyGetMetadataSendTimeInMs;
public final Histogram fileCopyGetMetadataTotalTimeInMs;
public final Meter fileCopyGetMetadataRequestRate;
public final Meter fileCopyGetMetadataDroppedRate;


public final Histogram batchDeleteBlobRequestQueueTimeInMs;
public final Histogram batchDeleteBlobProcessingTimeInMs;
public final Histogram batchDeleteBlobResponseQueueTimeInMs;
Expand Down Expand Up @@ -419,6 +428,22 @@ public ServerMetrics(MetricRegistry registry, Class<?> requestClass, Class<?> se
deleteBlobProcessingTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobProcessingTime"));
deleteBlobResponseQueueTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobResponseQueueTime"));

fileCopyGetMetadataRequestQueueTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "FileCopyGetMetadataRequestQueueTimeInMs"));
fileCopyGetMetadataProcessingTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "FileCopyGetMetadataProcessingTimeInMs"));
fileCopyGetMetadataResponseQueueTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "FileCopyGetMetadataResponseQueueTimeInMs"));
fileCopyGetMetadataSendTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "FileCopyGetMetadataSendTimeInMs"));
fileCopyGetMetadataTotalTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "FileCopyGetMetadataTotalTimeInMs"));
fileCopyGetMetadataRequestRate =
registry.meter(MetricRegistry.name(requestClass, "FileCopyGetMetadataRequestRate"));
fileCopyGetMetadataDroppedRate =
registry.meter(MetricRegistry.name(requestClass, "FileCopyGetMetadataDroppedRate"));

batchDeleteBlobRequestQueueTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "BatchDeleteBlobRequestQueueTimeInMs"));
batchDeleteBlobProcessingTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "BatchDeleteBlobProcessingTimeInMs"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import javax.annotation.Nonnull;


public class FileCopyGetMetaDataRequest extends RequestOrResponse{
public class FileCopyGetMetaDataRequest extends RequestOrResponse {
private final PartitionId partitionId;
private final String hostName;
public static final short File_Metadata_Request_Version_V1 = 1;
Expand Down Expand Up @@ -56,10 +56,12 @@ public static FileCopyGetMetaDataRequest readFrom(
@Nonnull ClusterMap clusterMap) throws IOException {
short versionId = stream.readShort();
validateVersion(versionId);

int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
String hostName = Utils.readIntString(stream);
PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream);

return new FileCopyGetMetaDataRequest(versionId, correlationId, clientId, partitionId, hostName);
}

Expand All @@ -73,10 +75,17 @@ public String toString() {
return sb.toString();
}

@Override
public void accept(RequestVisitor visitor) {
visitor.visit(this);
}

@Override
public long sizeInBytes() {
return super.sizeInBytes() + HostName_Field_Size_In_Bytes + hostName.length() + partitionId.getBytes().length;
}

@Override
protected void prepareBuffer() {
super.prepareBuffer();
Utils.serializeString(bufferToSend, hostName, Charset.defaultCharset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class FileCopyGetMetaDataResponse extends Response {
static short CURRENT_VERSION = File_Copy_Protocol_Metadata_Response_Version_V1;

public FileCopyGetMetaDataResponse(short versionId, int correlationId, String clientId, int numberOfLogfiles,
List<LogInfo> logInfos, ServerErrorCode errorCode) {
@Nonnull List<LogInfo> logInfos, @Nonnull ServerErrorCode errorCode) {
super(RequestOrResponseType.FileCopyGetMetaDataResponse, versionId, correlationId, clientId, errorCode);

validateVersion(versionId);
Expand Down Expand Up @@ -60,14 +60,15 @@ public static FileCopyGetMetaDataResponse readFrom(
//Setting the number of logfiles to 0 as there are no logfiles to be read.
return new FileCopyGetMetaDataResponse(versionId, correlationId, clientId, 0, new ArrayList<>(), errorCode);
}

int numberOfLogfiles = stream.readInt();
List<LogInfo> logInfos = new ArrayList<>();
for (int i = 0; i < numberOfLogfiles; i++) {
logInfos.add(LogInfo.readFrom(stream));
}
return new FileCopyGetMetaDataResponse(versionId, correlationId, clientId, numberOfLogfiles, logInfos, errorCode);
}

@Override
protected void prepareBuffer() {
super.prepareBuffer();
bufferToSend.writeInt(numberOfLogfiles);
Expand All @@ -76,6 +77,7 @@ protected void prepareBuffer() {
}
}

@Override
public long sizeInBytes() {
return super.sizeInBytes() + Integer.BYTES + logInfos.stream().mapToLong(LogInfo::sizeInBytes).sum();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class FileInfo {

public FileInfo(
@Nonnull String fileName,
@Nonnull long fileSize) {
long fileSize) {
this.fileName = fileName;
this.fileSizeInBytes = fileSize;
}
Expand All @@ -56,8 +56,10 @@ public void writeTo(@Nonnull ByteBuf buf) {

public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("FileInfo[").append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes)
.append("]");
sb.append("FileInfo[")
.append("FileName=").append(fileName)
.append(", FileSizeInBytes=").append(fileSizeInBytes)
.append("]");
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public interface RequestVisitor {
*/
void visit(BatchDeleteRequest deleteRequest);

/**
* Performs any actions related to Batch Delete request.
* @param fileCopyGetMetaDataRequest to visit.
*/
void visit(FileCopyGetMetaDataRequest fileCopyGetMetaDataRequest);

/**
* Performs any actions related to Un-delete request.
* @param undeleteRequest to visit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1684,65 +1684,57 @@ public void handleUndeleteRequest(NetworkRequest request) throws IOException, In
* Handler for FileMetadataRequest
*/
void handleFileCopyGetMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException {
long requestQueueTime = SystemTime.getInstance().milliseconds() - request.getStartTimeInMs();
long totalTimeSpent = requestQueueTime;
long startTime = SystemTime.getInstance().milliseconds();

FileCopyGetMetaDataRequest fileCopyGetMetaDataRequest =
FileCopyGetMetaDataRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap);

FileCopyGetMetaDataResponse response = null;
List<LogInfo> logInfos;
try {
List<com.github.ambry.store.LogInfo> logSegments =
storeManager.getLogSegmentMetadataFiles(fileCopyGetMetaDataRequest.getPartitionId(), true);

List<com.github.ambry.store.LogInfo> logSegments = storeManager.getLogSegmentMetadataFiles(
fileCopyGetMetaDataRequest.getPartitionId(), true);
logInfos = convertStoreToProtocolLogInfo(logSegments);

response = new FileCopyGetMetaDataResponse(
FileCopyGetMetaDataResponse.File_Copy_Protocol_Metadata_Response_Version_V1,
fileCopyGetMetaDataRequest.getCorrelationId(), fileCopyGetMetaDataRequest.getClientId(),
logInfos.size(), logInfos, ServerErrorCode.No_Error);
} catch (Exception e) {
logger.error("Error while getting log segment metadata for partition {}",
fileCopyGetMetaDataRequest.getPartitionId().getId(), e);
FileCopyGetMetaDataResponse response = new FileCopyGetMetaDataResponse(

response = new FileCopyGetMetaDataResponse(
FileCopyGetMetaDataResponse.File_Copy_Protocol_Metadata_Response_Version_V1,
fileCopyGetMetaDataRequest.getCorrelationId(), fileCopyGetMetaDataRequest.getClientId(),
0, null, ServerErrorCode.Unknown_Error);
requestResponseChannel.sendResponse(response, request, null);
return;
}
FileCopyGetMetaDataResponse response = new FileCopyGetMetaDataResponse(
FileCopyGetMetaDataResponse.File_Copy_Protocol_Metadata_Response_Version_V1,
fileCopyGetMetaDataRequest.getCorrelationId(), fileCopyGetMetaDataRequest.getClientId(),
logInfos.size(), logInfos, ServerErrorCode.No_Error);

// TODO: Add metrics for this operation
Histogram dummyHistogram = new Histogram(new Reservoir() {
@Override
public int size() {
return 0;
}

@Override
public void update(long value) {
}

@Override
public Snapshot getSnapshot() {
return null;
}
});
ServerNetworkResponseMetrics serverNetworkResponseMetrics = new ServerNetworkResponseMetrics(dummyHistogram,
dummyHistogram, dummyHistogram, null, null, 0);
0, new ArrayList<>(), ServerErrorCode.Unknown_Error);
} finally {
long processingTime = SystemTime.getInstance().milliseconds() - startTime;
totalTimeSpent += processingTime;
publicAccessLogger.info("{} {} processingTime {}", fileCopyGetMetaDataRequest, response, processingTime);

logger.info("Dw: Api response, partition-" + fileCopyGetMetaDataRequest.getPartitionId().getId() + " " + response);
requestResponseChannel.sendResponse(response, request, serverNetworkResponseMetrics);
// Update request metrics.
RequestMetricsUpdater metricsUpdater = new RequestMetricsUpdater(
requestQueueTime, processingTime, 0, 0, false);
fileCopyGetMetaDataRequest.accept(metricsUpdater);
}
requestResponseChannel.sendResponse(response, request,
new ServerNetworkResponseMetrics(metrics.fileCopyGetMetadataResponseQueueTimeInMs,
metrics.fileCopyGetMetadataSendTimeInMs, metrics.fileCopyGetMetadataTotalTimeInMs,
null, null, totalTimeSpent));
}

private List<LogInfo> convertStoreToProtocolLogInfo(List<com.github.ambry.store.LogInfo> logSegments) {
List<LogInfo> logInfos = new ArrayList<>();
for (com.github.ambry.store.LogInfo logSegment : logSegments) {
List<com.github.ambry.protocol.FileInfo> indexSegments = new ArrayList<>();
logSegment.getIndexSegments().forEach(indexSegment -> {
indexSegments.add(new com.github.ambry.protocol.FileInfo(indexSegment.getFileName(), indexSegment.getFileSize()));
});
logSegment.getIndexSegments().forEach(indexSegment ->
indexSegments.add(new com.github.ambry.protocol.FileInfo(indexSegment.getFileName(), indexSegment.getFileSize())));

List<com.github.ambry.protocol.FileInfo> bloomFilters = new ArrayList<>();
logSegment.getBloomFilters().forEach(bloomFilter -> {
bloomFilters.add(new com.github.ambry.protocol.FileInfo(bloomFilter.getFileName(), bloomFilter.getFileSize()));
});
logSegment.getBloomFilters().forEach(bloomFilter ->
bloomFilters.add(new com.github.ambry.protocol.FileInfo(bloomFilter.getFileName(), bloomFilter.getFileSize())));

logInfos.add(new LogInfo(
logSegment.getLogSegment().getFileName(), logSegment.getLogSegment().getFileSize(),
Expand Down Expand Up @@ -2128,6 +2120,20 @@ public void visit(BatchDeleteRequest deleteRequest) {

}

@Override
public void visit(FileCopyGetMetaDataRequest fileCopyGetMetaDataRequest) {
metrics.fileCopyGetMetadataRequestQueueTimeInMs.update(requestQueueTime);
metrics.fileCopyGetMetadataRequestRate.mark();
metrics.fileCopyGetMetadataProcessingTimeInMs.update(requestProcessingTime);
responseQueueTimeHistogram = metrics.fileCopyGetMetadataResponseQueueTimeInMs;
responseSendTimeHistogram = metrics.fileCopyGetMetadataSendTimeInMs;
requestTotalTimeHistogram = metrics.fileCopyGetMetadataTotalTimeInMs;
if (isRequestDropped) {
metrics.fileCopyGetMetadataDroppedRate.mark();
metrics.totalRequestDroppedRate.mark();
}
}

@Override
public void visit(UndeleteRequest undeleteRequest) {
metrics.undeleteBlobRequestQueueTimeInMs.update(requestQueueTime);
Expand Down
14 changes: 1 addition & 13 deletions ambry-store/src/main/java/com/github/ambry/store/BlobStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1330,22 +1330,10 @@ List<LogInfo> getLogSegmentMetadataFiles(boolean includeActiveLogSegment) {
List<FileInfo> sealedLogsAndMetaDataFiles = getLogSegments(includeActiveLogSegment);
if (null != sealedLogsAndMetaDataFiles) {
for (FileInfo E : sealedLogsAndMetaDataFiles) {
logger.info("[Dw] LS file: {} size: {}", E.getFileName(), E.getFileSize());

LogSegmentName logSegmentName = LogSegmentName.fromFilename(E.getFileName() + LogSegmentName.SUFFIX);

List<FileInfo> allIndexSegmentsForLogSegment = getAllIndexSegmentsForLogSegment(dataDir, logSegmentName);
if (null != allIndexSegmentsForLogSegment) {
for (FileInfo is : allIndexSegmentsForLogSegment) {
logger.info("[Dw] IS file: {} size: {}", is.getFileName(), is.getFileSize());
}
}
List<FileInfo> bloomFiltersForLogSegment = getAllBloomFiltersForLogSegment(dataDir, logSegmentName);
if (null != bloomFiltersForLogSegment) {
for (FileInfo bf : bloomFiltersForLogSegment) {
logger.info("[Dw] BF file: {} size: {}", bf.getFileName(), bf.getFileSize());
}
}

result.add(new LogInfo(E, allIndexSegmentsForLogSegment, bloomFiltersForLogSegment));
}
}
Expand Down

0 comments on commit 544e5e8

Please sign in to comment.