Skip to content

Commit

Permalink
added constant to correct file
Browse files Browse the repository at this point in the history
  • Loading branch information
allenaverbukh committed Jan 30, 2025
2 parents 21935d9 + 5df7368 commit c1446db
Show file tree
Hide file tree
Showing 68 changed files with 3,481 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,17 @@ public void testRenameDatasetVersion() throws Exception {
mySqlAccountStore.getDatasetVersion(testAccount.getId(), testContainer.getId(), testAccount.getName(),
testContainer.getName(), DATASET_NAME_RENAME, sourceVersion);
assertNull("Rename from should be null", datasetVersionRecord.getRenameFrom());

//rename a deleted version should return not found
mySqlAccountStore.deleteDatasetVersion(testAccount.getId(), testContainer.getId(), DATASET_NAME_RENAME, sourceVersion);
targetVersion = "5.5.5.5";
try {
mySqlAccountStore.renameDatasetVersion(testAccount.getId(), testContainer.getId(), testAccount.getName(),
testContainer.getName(), DATASET_NAME_RENAME, renamedSourceVersion, targetVersion);
fail();
} catch (AccountServiceException e) {
assertEquals("Mismatch on error code", AccountServiceErrorCode.NotFound, e.getErrorCode());
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,10 @@ public DatasetDao(MySqlDataAccessor dataAccessor, MySqlAccountServiceConfig mySq
//copy a dataset version from a source version.
//We need to update the modify time so counter based purge policy won't delete it.
copyToNewDatasetVersionSql = String.format(
"INSERT INTO %1$s (%2$s, %3$s, %4$s, %5$s, %6$s, %7$s, %8$s, %9$s, %10$s) SELECT %2$s, %3$s, %4$s, ?, "
+ "%6$s, NOW(3), %8$s, %9$s, ? FROM %1$s WHERE %2$s = ? AND %3$s = ? AND %4$s = ? AND %5$s = ? AND %9$s = ?",
"INSERT INTO %1$s (%2$s, %3$s, %4$s, %5$s, %6$s, %7$s, %8$s, %9$s, %10$s) "
+ "SELECT %2$s, %3$s, %4$s, ?, %6$s, NOW(3), %8$s, %9$s, ? "
+ "FROM %1$s WHERE %2$s = ? AND %3$s = ? AND %4$s = ? AND %5$s = ? AND %9$s = ? "
+ "AND (%8$s IS NULL OR %8$s > NOW(3))",
DATASET_VERSION_TABLE, ACCOUNT_ID, CONTAINER_ID, DATASET_NAME, VERSION, CREATION_TIME, LAST_MODIFIED_TIME,
DELETED_TS, DATASET_VERSION_STATE, RENAME_FROM);
//dataset version has in_progress and ready states.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ public interface ReplicaSyncUpManager {
*/
void initiateBootstrap(ReplicaId replicaId);

/**
* Initiate File Copy process if the replica is newly added and needs to catch up with peer ones using File Copy Protocol.
* @param replicaId the replica to initiate File Copy Based Replication
*/
void initiateFileCopy(ReplicaId replicaId);

/**
* Wait until bootstrap on given replica is complete.
* until given replica has caught up with enough peer replicas either in local DC or remote DCs
Expand All @@ -36,6 +42,14 @@ public interface ReplicaSyncUpManager {
*/
void waitBootstrapCompleted(String partitionName) throws InterruptedException;

/**
* Wait until All Sealed Log, Index and Bloom Files are copied to the given replica from a remote replica.
* @param partitionName partition name of replica that in file copy state
* @throws InterruptedException
*/
void waitForFileCopyCompleted(String partitionName) throws InterruptedException;


/**
* Update replica lag (in byte) between two replicas (local and peer replica) and check sync-up status.
* @param localReplica the replica that resides on current node
Expand Down Expand Up @@ -64,6 +78,12 @@ boolean updateReplicaLagAndCheckSyncStatus(ReplicaId localReplica, ReplicaId pee
*/
void onBootstrapComplete(ReplicaId replicaId);

/**
* Called when file based replication of a replica is complete.
* @param replicaId The replica which completes file Copy
*/
void onFileCopyComplete(ReplicaId replicaId);

/**
* Deactivation on given replica is complete.
* @param replicaId the replica which completes deactivation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,17 @@ public enum StateModelListenerType {
* leadership hand-off occurs. For example, if any replica becomes LEADER from STANDBY, it is supposed to replicate
* data from VCR nodes. This is part of two-way replication between Ambry and cloud.
*/
CloudToStoreReplicationManagerListener
CloudToStoreReplicationManagerListener,

/**
* The partition state change listener owned by file copy manager. It takes actions when new replica is added (OFFLINE ->
* BOOTSTRAP)
*/
FileCopyManagerListener,

/**
* The partition state change listener owned by Bootstrap Controller.
* It takes actions when Offline -> Bootstrap state transition is called for a partition.
*/
BootstrapControllerListener
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ public enum TransitionErrorCode {
/**
* If the resource name is not a numeric number.
*/
InvalidResourceName
InvalidResourceName,

/**
* If File Based Replication Protocol fails at some point for specific replica.
*/
FileCopyProtocolFailure,

/**
* If Bootstap Controller fails in pre-filecopy steps for specific replica.
*/
BootstrapControllerFailure
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Copyright 2016 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileCopyBasedReplicationConfig {

private static final Logger logger = LoggerFactory.getLogger(FileCopyBasedReplicationConfig.class);

/**
* The number of partitions that can be hydrated in parallel per disk
*/
public static final String FILE_COPY_PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK = "filecopy.parallel.partition.hydration.count.per.disk";
@Config(FILE_COPY_PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK)
public final int fileCopyParallelPartitionHydrationCountPerDisk;

/**
* The number of threads that can be used to copy files
*/
public static final String FILE_COPY_NUMBER_OF_FILE_COPY_THREADS = "filecopy.number.of.file.copy.threads";
@Config(FILE_COPY_NUMBER_OF_FILE_COPY_THREADS)
public final int fileCopyNumberOfFileCopyThreads;

/**
* The timeout for each file chunk
*/
public static final String FILE_COPY_FILE_CHUNK_TIMEOUT_IN_MINUTES = "filecopy.file.chunk.timeout.in.minutes";
@Config(FILE_COPY_FILE_CHUNK_TIMEOUT_IN_MINUTES)
public final long fileCopyFileChunkTimeoutInMins;

/**
* The frequency at which the data gets flushed to disk
*/
public static final String FILE_COPY_DATA_FLUSH_INTERVAL_IN_MBS = "filecopy.data.flush.interval.in.mbs";
@Config(FILE_COPY_DATA_FLUSH_INTERVAL_IN_MBS)
@Default("1000")
public final long fileCopyDataFlushIntervalInMbs;

/**
* The name of the file that stores the metadata for the file copy
*/
public static final String File_COPY_META_DATA_FILE_NAME = "filecopy.meta.data.file.name";
@Config(File_COPY_META_DATA_FILE_NAME)
@Default("sealed_segments_metadata_file")
public final String fileCopyMetaDataFileName;


public FileCopyBasedReplicationConfig(VerifiableProperties verifiableProperties) {
fileCopyMetaDataFileName = verifiableProperties.getString(File_COPY_META_DATA_FILE_NAME, "sealed_segments_metadata_file");
fileCopyParallelPartitionHydrationCountPerDisk = verifiableProperties.getInt(FILE_COPY_PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK, 1);
fileCopyNumberOfFileCopyThreads = verifiableProperties.getInt(FILE_COPY_NUMBER_OF_FILE_COPY_THREADS, 4);
fileCopyFileChunkTimeoutInMins = verifiableProperties.getInt(FILE_COPY_FILE_CHUNK_TIMEOUT_IN_MINUTES, 5);
fileCopyDataFlushIntervalInMbs = verifiableProperties.getLong(FILE_COPY_DATA_FLUSH_INTERVAL_IN_MBS, 1000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ public class ServerConfig {
@Config(SERVER_EXECUTION_MODE)
public final ServerExecutionMode serverExecutionMode;

/**
* Decide Replication Protocol For Hydration Of Newly Added Replicas
*/
public static final String SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION = "server.replication.protocol.for.hydration";
@Config(SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION)
public final ServerReplicationMode serverReplicationProtocolForHydration;

public ServerConfig(VerifiableProperties verifiableProperties) {
serverExecutionMode = verifiableProperties.getEnum(SERVER_EXECUTION_MODE, ServerExecutionMode.class,
ServerExecutionMode.DATA_SERVING_MODE);
Expand Down Expand Up @@ -176,5 +183,7 @@ public ServerConfig(VerifiableProperties verifiableProperties) {
serverSecurityServiceFactory = verifiableProperties.getString("server.security.service.factory",
"com.github.ambry.server.AmbryServerSecurityServiceFactory");
serverRepairRequestsDbFactory = verifiableProperties.getString("server.repair.requests.db.factory", null);
serverReplicationProtocolForHydration = verifiableProperties.getEnum(SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION,
ServerReplicationMode.class, ServerReplicationMode.BLOB_BASED);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright 2016 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.config;
public enum ServerReplicationMode {
BLOB_BASED,
FILE_BASED;
}
31 changes: 30 additions & 1 deletion ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,33 @@ public class StoreConfig {
@Config("store.reshuffle.disks.on.reorder")
@Default("false")
public final boolean storeReshuffleDisksOnReorder;

public final static String storeReshuffleDisksOnReorderName = "store.reshuffle.disks.on.reorder";

/**
* Name Of file to Store Sealed Segments Progress Status
*/
public static final String STORE_FILE_COPY_IN_PROGRESS_FILE_NAME = "store.file.copy.in.progress.file.name";
@Config(STORE_FILE_COPY_IN_PROGRESS_FILE_NAME)
@Default("file_copy_in_progress")
public final String storeFileCopyInProgressFileName;

/**
* Name of file to Store Sealed Segments Completion Status
*/
public static final String STORE_FILE_COPY_COMPLETED_FILE_NAME = "store.file.copy.completed.file.name";
@Config(STORE_FILE_COPY_COMPLETED_FILE_NAME)
@Default("file_copy_completed")
public final String storeFileCopyCompletedFileName;

/**
* Name Of file to Store Bootstrap Progress Status
*/
public static final String STORE_BOOTSTRAP_IN_PROGRESS_FILE = "store.bootstrap.in.progress.file.name";
@Config(STORE_BOOTSTRAP_IN_PROGRESS_FILE)
@Default("bootstrap_in_progress")
public final String storeBootstrapInProgressFile;

public StoreConfig(VerifiableProperties verifiableProperties) {
storeKeyFactory = verifiableProperties.getString("store.key.factory", "com.github.ambry.commons.BlobIdFactory");
storeDataFlushIntervalSeconds = verifiableProperties.getLong("store.data.flush.interval.seconds", 60);
Expand Down Expand Up @@ -861,5 +886,9 @@ public StoreConfig(VerifiableProperties verifiableProperties) {
storeStaleTimeInDays = verifiableProperties.getIntInRange(storeStaleTimeInDaysName, 7, 0, Integer.MAX_VALUE);
storeBlockStaleBlobStoreToStart = verifiableProperties.getBoolean(storeBlockStaleBlobStoreToStartName, false);
storeReshuffleDisksOnReorder = verifiableProperties.getBoolean(storeReshuffleDisksOnReorderName, false);

storeFileCopyInProgressFileName = verifiableProperties.getString(STORE_FILE_COPY_IN_PROGRESS_FILE_NAME, "file_copy_in_progress");
storeBootstrapInProgressFile = verifiableProperties.getString(STORE_BOOTSTRAP_IN_PROGRESS_FILE, "bootstrap_in_progress");
storeFileCopyCompletedFileName = verifiableProperties.getString(STORE_FILE_COPY_COMPLETED_FILE_NAME, "file_copy_completed");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public static abstract class AbstractListBucketResult {
private List<Contents> contents;
@JacksonXmlProperty(localName = "EncodingType")
private String encodingType;
@JacksonXmlProperty(localName = "isTruncated")
@JacksonXmlProperty(localName = "IsTruncated")
private Boolean isTruncated;

private AbstractListBucketResult() {
Expand Down Expand Up @@ -319,6 +319,10 @@ public String getKey() {

public long getSize() { return size; }

public String getLastModified() {
return lastModified;
}

@Override
public String toString() {
return "Key=" + key + ", " + "LastModified=" + lastModified + ", " + "Size=" + size;
Expand Down
Loading

0 comments on commit c1446db

Please sign in to comment.