Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FileCopy based bootstrap] Integration branch for demo #2993

Draft
wants to merge 29 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3c5701b
WIP
DevenAhluwalia Jan 9, 2025
2ad8692
WIP
DevenAhluwalia Jan 14, 2025
0ce0d91
WIP
DevenAhluwalia Jan 14, 2025
2a23d33
WIP
DevenAhluwalia Jan 17, 2025
4d89d8a
WIP
DevenAhluwalia Jan 21, 2025
50bef4f
Merge branch 'master' of https://github.com/linkedin/ambry into dw-fi…
DevenAhluwalia Jan 21, 2025
5a9feac
Added changes for filestore implementation for FileCopy
Jan 22, 2025
6105b5c
WIP
DevenAhluwalia Jan 22, 2025
b91f2a3
WIP
DevenAhluwalia Jan 22, 2025
201a47d
WIP
DevenAhluwalia Jan 25, 2025
9bcc85a
WIP (#2995)
DevenAhluwalia Jan 27, 2025
1b9b03e
WIP
DevenAhluwalia Jan 28, 2025
439733d
Added changes for Filestore's read chunk
Jan 28, 2025
675f217
Added test util for file chunking and aggregating to output file with…
Jan 28, 2025
37fe089
WIP
DevenAhluwalia Jan 29, 2025
cb80572
Merge branch 'dr_demo1' of https://github.com/linkedin/ambry into dr_…
DevenAhluwalia Jan 29, 2025
7bb1dac
WIP
DevenAhluwalia Jan 29, 2025
9cb483e
Resolving conflicts
aga9900 Jan 29, 2025
5914eba
WIP
DevenAhluwalia Jan 29, 2025
7dcc9dd
Merge branch 'dr_demo1' of https://github.com/linkedin/ambry into dr_…
DevenAhluwalia Jan 29, 2025
b0fe2f9
Updating FileCopy Meta Data Response
aga9900 Jan 29, 2025
3736b39
Updating State Build Flow For test Partition
aga9900 Jan 29, 2025
40ab7d0
Updating Sleep Time For Thread to 1 hour
aga9900 Jan 29, 2025
70326f3
Adding state build on top of copied files using Filestore
Jan 29, 2025
fda8548
Added test util for state build verification
Jan 30, 2025
7e25168
Final step for T3 demo: getBlob operations post state build for T3 de…
Jan 30, 2025
c56816f
Merge branch 'master' of https://github.com/linkedin/ambry into dr_demo1
DevenAhluwalia Feb 3, 2025
cd360f1
Merge branch 'dr_demo1' of https://github.com/linkedin/ambry into dr_…
DevenAhluwalia Feb 3, 2025
5108f08
WIP
DevenAhluwalia Feb 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.github.ambry.clustermap;

public class FileStoreException extends RuntimeException{

private static final long serialVersionUID = 1L;
private final FileStoreErrorCode error;

public FileStoreException(String s, FileStoreErrorCode error) {
super(s);
this.error = error;
}

public FileStoreException(String s, FileStoreErrorCode error, Throwable throwable) {
super(s, throwable);
this.error = error;
}

public enum FileStoreErrorCode{
FileStoreRunningFailure
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ public enum TransitionErrorCode {
/**
* If Bootstap Controller fails in pre-filecopy steps for specific replica.
*/
BootstrapControllerFailure
BootstrapControllerFailure,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.github.ambry.config;

public class FileCopyConfig {

public static final String PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK = "parallel.partition.hydration.count.per.disk";
@Config(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK)
public final int parallelPartitionHydrationCountPerDisk;

public static final String NUMBER_OF_FILE_COPY_THREADS = "number.of.file.copy.threads";
@Config(NUMBER_OF_FILE_COPY_THREADS)
public final int numberOfFileCopyThreads;

public static final String FILE_CHUNK_TIMEOUT_IN_MINUTES = "file.chunk.timeout.in.minutes";
@Config(FILE_CHUNK_TIMEOUT_IN_MINUTES)
public final long fileChunkTimeoutInMins;

/**
* The frequency at which the data gets flushed to disk
*/
public static final String STORE_DATA_FLUSH_INTERVAL_IN_MBS = "store.data.flush.interval.In.MBs";
@Config(STORE_DATA_FLUSH_INTERVAL_IN_MBS)
@Default("1000")
public final long storeDataFlushIntervalInMbs;

public static final String FILE_COPY_META_DATA_FILE_NAME = "file.copy.meta.data.file.name";
@Config(FILE_COPY_META_DATA_FILE_NAME)
@Default("sealed_logs_metadata_file")
public final String fileCopyMetaDataFileName;

public FileCopyConfig(VerifiableProperties verifiableProperties) {
fileCopyMetaDataFileName = verifiableProperties.getString(FILE_COPY_META_DATA_FILE_NAME, "sealed_logs_metadata_file");
parallelPartitionHydrationCountPerDisk = verifiableProperties.getInt(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK, 1);
numberOfFileCopyThreads = verifiableProperties.getInt(NUMBER_OF_FILE_COPY_THREADS, 4);
fileChunkTimeoutInMins = verifiableProperties.getInt(FILE_CHUNK_TIMEOUT_IN_MINUTES, 5);
storeDataFlushIntervalInMbs = verifiableProperties.getLong(STORE_DATA_FLUSH_INTERVAL_IN_MBS, 1000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ public Http2ClientConfig(VerifiableProperties verifiableProperties) {
http2DropRequestOnWriteAndFlushTimeout =
verifiableProperties.getBoolean(HTTP2_DROP_REQUEST_ON_WRITE_AND_FLUSH_TIMEOUT, false);
http2BlockingChannelAcquireTimeoutMs = verifiableProperties.getInt(HTTP2_BLOCKING_CHANNEL_ACQUIRE_TIMEOUT_MS, 1000);
http2BlockingChannelSendTimeoutMs = verifiableProperties.getInt(HTTP2_BLOCKING_CHANNEL_SEND_TIMEOUT_MS, 2000);
http2BlockingChannelReceiveTimeoutMs = verifiableProperties.getInt(HTTP2_BLOCKING_CHANNEL_RECEIVE_TIMEOUT_MS, 5000);
http2BlockingChannelSendTimeoutMs = verifiableProperties.getInt(HTTP2_BLOCKING_CHANNEL_SEND_TIMEOUT_MS, 99999);
http2BlockingChannelReceiveTimeoutMs = verifiableProperties.getInt(HTTP2_BLOCKING_CHANNEL_RECEIVE_TIMEOUT_MS, 99999);
http2BlockingChannelPoolShutdownTimeoutMs =
verifiableProperties.getInt(HTTP2_BLOCKING_CHANNEL_POOL_SHUTDOWN_TIMEOUT_MS, 3000);
verifiableProperties.getInt(HTTP2_BLOCKING_CHANNEL_POOL_SHUTDOWN_TIMEOUT_MS, 99999);
http2PeerCertificateSanRegex = verifiableProperties.getString(HTTP2_PEER_CERTIFICATE_SAN_REGEX, "");
http2TimeoutAsNetworkError = verifiableProperties.getBoolean(HTTP2_TIMEOUT_AS_NETWORK_ERROR, false);
http2RequestAdditionalTimeoutMs = verifiableProperties.getInt(HTTP2_REQUEST_ADDITIONAL_TIMEOUT_MS, 2500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
import com.github.ambry.clustermap.ClusterParticipant;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.store.ChunkResponse;
import com.github.ambry.store.FileStore;
import com.github.ambry.store.LogInfo;
import com.github.ambry.store.Store;
import com.github.ambry.store.StoreException;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.FileStore;
import java.util.Collection;
import java.util.List;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -51,6 +55,8 @@ public interface StoreManager {
*/
boolean addFileStore(ReplicaId replicaId);

void setUpReplica(String partitionName);

/**
* Build state after filecopy is completed
* @param replica the {@link ReplicaId} of the {@link Store} for which store needs to be built
Expand Down Expand Up @@ -153,4 +159,23 @@ public interface StoreManager {
* @throws IOException
*/
boolean isFilesExistForPattern(PartitionId partitionId, Pattern pattern) throws IOException;

/**
* Get the list of log segment metadata files for a given partition.
* @param partitionId
* @param includeActiveLogSegment
* @return List of LogSegmentFiles along with its IndexFiles, BloomFilterFiles
*/
List<LogInfo> getLogSegmentMetadataFiles(PartitionId partitionId, boolean includeActiveLogSegment);

/**
* Get the chunk from the requested file (Log segment, Index File, Bloom Filter).
* @param partitionId
* @param fileName
* @param sizeInBytes
* @param startOffset
* @return FileInputStream containing the chunk of size {@code sizeInBytes} starting from {@code startOffset}.
*/
ChunkResponse getChunk(PartitionId partitionId, String fileName, long sizeInBytes, long startOffset)
throws IOException;
}
22 changes: 22 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/store/ChunkResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.github.ambry.store;

import java.io.DataInputStream;


public class ChunkResponse {
private final DataInputStream stream;
private final long chunkLength;

public ChunkResponse(DataInputStream stream, long chunkLength) {
this.stream = stream;
this.chunkLength = chunkLength;
}

public DataInputStream getStream() {
return stream;
}

public long getChunkLength() {
return chunkLength;
}
}
41 changes: 41 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/store/FileInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.store;


public class FileInfo {
private final String fileName;
private final long fileSize;

public FileInfo(String fileName, Long fileSize) {
this.fileName = fileName;
this.fileSize = fileSize;
}

public String getFileName() {
return fileName;
}

public Long getFileSize() {
return fileSize;
}

@Override
public String toString() {
return "FileInfo{" +
"fileName='" + fileName + '\'' +
", fileSize=" + fileSize +
'}';
}
}
Loading
Loading