Skip to content

Commit

Permalink
1. Different name for backup metadata was introduced.
Browse files Browse the repository at this point in the history
2. Restore process does not crash if backup of broken files does not pass.
  • Loading branch information
andrii0lomakin committed Feb 9, 2024
1 parent c53eb22 commit 0da2afb
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -738,15 +738,16 @@ public void prepareForBackup() {
highAddressAndRoot[1], log.getCachePageSize(), log.getFileLengthBound(),
true, fileAddress, fileOffset);

var backupMetadata = Paths.get(log.getLocation()).resolve(BackupMetadata.BACKUP_METADATA_FILE_NAME);
var startBackupMetadata = Paths.get(log.getLocation()).resolve(
BackupMetadata.START_BACKUP_METADATA_FILE_NAME);
try {
Files.deleteIfExists(backupMetadata);
Files.deleteIfExists(startBackupMetadata);
} catch (IOException e) {
throw new ExodusException("Error deletion of previous backup metadata", e);
}

try (var channel = FileChannel.open(
backupMetadata, StandardOpenOption.CREATE_NEW,
startBackupMetadata, StandardOpenOption.CREATE_NEW,
StandardOpenOption.WRITE)) {
while (metadata.remaining() > 0) {
//noinspection ResultOfMethodCallIgnored
Expand All @@ -766,9 +767,10 @@ public void finishBackup() {
if (isOpen()) {
gc.resume();
}
var backupMetadata = Paths.get(log.getLocation()).resolve(BackupMetadata.BACKUP_METADATA_FILE_NAME);
var startBackupMetadata = Paths.get(log.getLocation()).resolve(
BackupMetadata.START_BACKUP_METADATA_FILE_NAME);
try {
Files.deleteIfExists(backupMetadata);
Files.deleteIfExists(startBackupMetadata);
} catch (IOException e) {
throw new ExodusException("Error deletion of previous backup metadata", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
@SuppressWarnings({"unused", "FieldCanBeLocal"})
public class BackupMetadata extends StartupMetadata {
public static final String BACKUP_METADATA_FILE_NAME = "backup-metadata";
public static final String START_BACKUP_METADATA_FILE_NAME = "start_backup-metadata";

static final int LAST_FILE_ADDRESS = StartupMetadata.FILE_SIZE;
static final int LAST_FILE_OFFSET = LAST_FILE_ADDRESS + Long.BYTES;
Expand Down
115 changes: 87 additions & 28 deletions environment/src/main/kotlin/jetbrains/exodus/log/Log.kt
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,22 @@ class Log(val config: LogConfig, expectedEnvironmentVersion: Int) : Closeable, C
}

copyFilesInRestoreTempDir(
blocks.tailMap(backupMetadata.lastFileAddress, true).values.iterator()
blocks.tailMap(
backupMetadata.lastFileAddress,
true
).values.iterator()
)

val blocksToTruncateIterator = blocks.tailMap(
backupMetadata.lastFileAddress,
false
).values.iterator()

truncateFile(lastSegmentFile.toFile(), backupMetadata.lastFileOffset, null)
truncateFile(
lastSegmentFile.toFile(),
backupMetadata.lastFileOffset,
null
)

if (blocksToTruncateIterator.hasNext()) {
val blockToDelete = blocksToTruncateIterator.next()
Expand All @@ -274,7 +281,10 @@ class Log(val config: LogConfig, expectedEnvironmentVersion: Int) : Closeable, C
startupMetadata = backupMetadata
restoredFromBackup = true
} catch (ex: Exception) {
logger.error("Failed to restore database $location from dynamic backup. ", ex)
logger.error(
"Failed to restore database $location from dynamic backup. ",
ex
)
}
}
}
Expand Down Expand Up @@ -425,7 +435,13 @@ class Log(val config: LogConfig, expectedEnvironmentVersion: Int) : Closeable, C
generationCount
)
else
SeparateLogCache(memoryUsage, cachePageSize, nonBlockingCache, useSoftReferences, generationCount)
SeparateLogCache(
memoryUsage,
cachePageSize,
nonBlockingCache,
useSoftReferences,
generationCount
)
} else {
val memoryUsagePercentage = config.memoryUsagePercentage
if (config.isSharedCache)
Expand Down Expand Up @@ -504,7 +520,13 @@ class Log(val config: LogConfig, expectedEnvironmentVersion: Int) : Closeable, C
if (!rwIsReadonly && writtenInPage > 0) {
logger.warn(
"Page ${(highAddress and (cachePageSize - 1).toLong())} is not written completely, fixing it. " +
"Environment : $location, file : ${LogUtil.getLogFilename(getFileAddress(highAddress))}."
"Environment : $location, file : ${
LogUtil.getLogFilename(
getFileAddress(
highAddress
)
)
}."
)

if (needToPerformMigration) {
Expand Down Expand Up @@ -543,16 +565,20 @@ class Log(val config: LogConfig, expectedEnvironmentVersion: Int) : Closeable, C
}

private fun copyFilesInRestoreTempDir(files: Iterator<FileDataReader.FileBlock>) {
val tempDir = createTempRestoreDirectoryWithDate()
logger.info("Database $location - copying files into the temp directory $tempDir before data restore.")
try {
val tempDir = createTempRestoreDirectoryWithDate()
logger.info("Database $location - copying files into the temp directory $tempDir before data restore.")

while (files.hasNext()) {
val file = files.next()
logger.info("Database $location - file $file is copied into the temp directory $tempDir")
Files.copy(file.toPath(), tempDir.resolve(file.toPath().fileName))
}
while (files.hasNext()) {
val file = files.next()
logger.info("Database $location - file $file is copied into the temp directory $tempDir")
Files.copy(file.toPath(), tempDir.resolve(file.toPath().fileName))
}

logger.info("Database $location - copying of files into the temp directory $tempDir is completed.")
logger.info("Database $location - copying of files into the temp directory $tempDir is completed.")
} catch (e: Exception) {
logger.error("Error during backup of broken files", e)
}
}

private fun createTempRestoreDirectoryWithDate(): Path {
Expand Down Expand Up @@ -648,7 +674,10 @@ class Log(val config: LogConfig, expectedEnvironmentVersion: Int) : Closeable, C
return startupMetadata.rootAddress
}

private fun checkLogConsistency(blockSetMutable: BlockSet.Mutable, loadedDbRootAddress: Long): Long {
private fun checkLogConsistency(
blockSetMutable: BlockSet.Mutable,
loadedDbRootAddress: Long
): Long {
var blockIterator = reader.blocks.iterator()
if (!blockIterator.hasNext()) {
return Long.MIN_VALUE
Expand Down Expand Up @@ -768,7 +797,11 @@ class Log(val config: LogConfig, expectedEnvironmentVersion: Int) : Closeable, C
try {
do {
if (nextBlockCorruptionMessage != null) {
DataCorruptionException.raise(nextBlockCorruptionMessage, this, corruptedFileAddress)
DataCorruptionException.raise(
nextBlockCorruptionMessage,
this,
corruptedFileAddress
)
}

val block = fileBlockIterator.next()
Expand Down Expand Up @@ -886,7 +919,11 @@ class Log(val config: LogConfig, expectedEnvironmentVersion: Int) : Closeable, C

blockSetMutable.add(startBlockAddress, block)
if (!hasNext && nextBlockCorruptionMessage != null) {
DataCorruptionException.raise(nextBlockCorruptionMessage, this, corruptedFileAddress)
DataCorruptionException.raise(
nextBlockCorruptionMessage,
this,
corruptedFileAddress
)
}
} while (hasNext)
} catch (exception: Exception) {
Expand Down Expand Up @@ -933,7 +970,11 @@ class Log(val config: LogConfig, expectedEnvironmentVersion: Int) : Closeable, C
logger.warn(
"Data corruption was detected. Reason : \"${exception.message}\". " +
"Database '$location' will be truncated till address : $dbRootEndAddress. " +
"Name of the file to be truncated : ${LogUtil.getLogFilename(endBlockAddress)}. " +
"Name of the file to be truncated : ${
LogUtil.getLogFilename(
endBlockAddress
)
}. " +
"Initial file size ${endBlock.length()} bytes, final file size $endBlockLength bytes."
)

Expand Down Expand Up @@ -1398,7 +1439,12 @@ class Log(val config: LogConfig, expectedEnvironmentVersion: Int) : Closeable, C
return LoggableIterator(this, startAddress, highReadAddress)
}

fun tryWrite(type: Byte, structureId: Int, data: ByteIterable, expiredLoggables: ExpiredLoggableCollection): Long {
fun tryWrite(
type: Byte,
structureId: Int,
data: ByteIterable,
expiredLoggables: ExpiredLoggableCollection
): Long {
// allow new file creation only if new file starts loggable
val result = writeContinuously(type, structureId, data, expiredLoggables)
if (result < 0) {
Expand All @@ -1419,7 +1465,12 @@ class Log(val config: LogConfig, expectedEnvironmentVersion: Int) : Closeable, C
return write(loggable.type, loggable.structureId, loggable.data, expiredLoggables)
}

fun write(type: Byte, structureId: Int, data: ByteIterable, expiredLoggables: ExpiredLoggableCollection): Long {
fun write(
type: Byte,
structureId: Int,
data: ByteIterable,
expiredLoggables: ExpiredLoggableCollection
): Long {
// allow new file creation only if new file starts loggable
var result = writeContinuously(type, structureId, data, expiredLoggables)
if (result < 0) {
Expand Down Expand Up @@ -1686,7 +1737,8 @@ class Log(val config: LogConfig, expectedEnvironmentVersion: Int) : Closeable, C
val exceptionMessage = StringBuilder()
exceptionMessage.append(
"Can't acquire environment lock after "
).append(lockTimeout).append(" ms.\n\n Lock owner info: \n").append(dataWriter.lockInfo())
).append(lockTimeout).append(" ms.\n\n Lock owner info: \n")
.append(dataWriter.lockInfo())
if (dataWriter is AsyncFileDataWriter) {
exceptionMessage.append("\n Lock file path: ").append(dataWriter.lockFilePath())
}
Expand Down Expand Up @@ -1719,20 +1771,23 @@ class Log(val config: LogConfig, expectedEnvironmentVersion: Int) : Closeable, C
if (isNull) {
writer.write(type xor 0x80.toByte())
} else {
val structureIdIterable = CompressedUnsignedLongByteIterable.getIterable(structureId.toLong())
val structureIdIterable =
CompressedUnsignedLongByteIterable.getIterable(structureId.toLong())
val dataLength = data.length
val dataLengthIterable = CompressedUnsignedLongByteIterable.getIterable(dataLength.toLong())
val dataLengthIterable =
CompressedUnsignedLongByteIterable.getIterable(dataLength.toLong())
recordLength += structureIdIterable.length
recordLength += dataLengthIterable.length
recordLength += dataLength

val leftInPage =
cachePageSize - (result.toInt() and (cachePageSize - 1)) - BufferedDataWriter.HASH_CODE_SIZE
val delta = if (leftInPage in 1 until recordLength && recordLength < (cachePageSize shr 4)) {
leftInPage + BufferedDataWriter.HASH_CODE_SIZE
} else {
0
}
val delta =
if (leftInPage in 1 until recordLength && recordLength < (cachePageSize shr 4)) {
leftInPage + BufferedDataWriter.HASH_CODE_SIZE
} else {
0
}

if (!writer.fitsIntoSingleFile(fileLengthBound, recordLength + delta)) {
return -1L
Expand Down Expand Up @@ -1802,7 +1857,11 @@ class Log(val config: LogConfig, expectedEnvironmentVersion: Int) : Closeable, C
return writer.mutableBlocksUnsafe()
}

fun updateBlockSetHighAddressUnsafe(prevHighAddress: Long, highAddress: Long, blockSet: BlockSet.Immutable) {
fun updateBlockSetHighAddressUnsafe(
prevHighAddress: Long,
highAddress: Long,
blockSet: BlockSet.Immutable
) {
writer.updateBlockSetHighAddressUnsafe(prevHighAddress, highAddress, blockSet)
}

Expand Down

0 comments on commit 0da2afb

Please sign in to comment.