Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

publish: build checkpoint files during ledger close #4446

Merged
merged 10 commits into from
Oct 18, 2024
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "1.81.0"
channel = "1.82.0"
12 changes: 12 additions & 0 deletions src/bucket/BucketManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <set>
#include <thread>

#include "history/FileTransferInfo.h"
#include "medida/counter.h"
#include "medida/meter.h"
#include "medida/metrics_registry.h"
Expand Down Expand Up @@ -134,6 +135,17 @@ BucketManagerImpl::initialize()
mApp.getConfig().QUERY_SNAPSHOT_LEDGERS);
}
}

// Create persistent publish directories
// Note: HISTORY_FILE_TYPE_BUCKET is already tracked by BucketList in
// BUCKET_DIR_PATH, HISTORY_FILE_TYPE_SCP is persisted to the database
// so create the remaining ledger header, transactions and results
// directories
createPublishDir(FileType::HISTORY_FILE_TYPE_LEDGER, mApp.getConfig());
createPublishDir(FileType::HISTORY_FILE_TYPE_TRANSACTIONS,
mApp.getConfig());
createPublishDir(FileType::HISTORY_FILE_TYPE_RESULTS, mApp.getConfig());
HistoryManager::createPublishQueueDir(mApp.getConfig());
}

void
Expand Down
1 change: 0 additions & 1 deletion src/bucket/test/BucketManagerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,6 @@ TEST_CASE_VERSIONS(
auto& hm = app->getHistoryManager();
auto& bm = app->getBucketManager();
auto& bl = bm.getBucketList();
auto& lm = app->getLedgerManager();
hm.setPublicationEnabled(false);
app->getHistoryArchiveManager().initializeHistoryArchive(
tcfg.getArchiveDirName());
Expand Down
5 changes: 3 additions & 2 deletions src/catchup/ApplyCheckpointWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ ApplyCheckpointWork::openInputFiles()
ZoneScoped;
mHdrIn.close();
mTxIn.close();
FileTransferInfo hi(mDownloadDir, HISTORY_FILE_TYPE_LEDGER, mCheckpoint);
FileTransferInfo ti(mDownloadDir, HISTORY_FILE_TYPE_TRANSACTIONS,
FileTransferInfo hi(mDownloadDir, FileType::HISTORY_FILE_TYPE_LEDGER,
mCheckpoint);
FileTransferInfo ti(mDownloadDir, FileType::HISTORY_FILE_TYPE_TRANSACTIONS,
mCheckpoint);
CLOG_DEBUG(History, "Replaying ledger headers from {}",
hi.localPath_nogz());
Expand Down
3 changes: 2 additions & 1 deletion src/catchup/CatchupManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "catchup/CatchupWork.h"
#include "herder/LedgerCloseData.h"
#include "history/FileTransferInfo.h"
#include <functional>
#include <memory>
#include <system_error>
Expand Down Expand Up @@ -115,6 +116,6 @@ class CatchupManager
virtual void ledgerChainsVerificationFailed(uint32_t num = 1) = 0;
virtual void bucketsApplied(uint32_t num = 1) = 0;
virtual void txSetsApplied(uint32_t num = 1) = 0;
virtual void fileDownloaded(std::string type, uint32_t num = 1) = 0;
virtual void fileDownloaded(FileType type, uint32_t num = 1) = 0;
};
}
13 changes: 7 additions & 6 deletions src/catchup/CatchupManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,26 +495,27 @@ CatchupManagerImpl::txSetsApplied(uint32_t num)
}

void
CatchupManagerImpl::fileDownloaded(std::string type, uint32_t num)
CatchupManagerImpl::fileDownloaded(FileType type, uint32_t num)
{
if (type == HISTORY_FILE_TYPE_BUCKET)
if (type == FileType::HISTORY_FILE_TYPE_BUCKET)
{
mMetrics.mBucketsDownloaded += num;
}
else if (type == HISTORY_FILE_TYPE_LEDGER)
else if (type == FileType::HISTORY_FILE_TYPE_LEDGER)
{
mMetrics.mCheckpointsDownloaded += num;
}
else if (type == HISTORY_FILE_TYPE_TRANSACTIONS)
else if (type == FileType::HISTORY_FILE_TYPE_TRANSACTIONS)
{
mMetrics.mTxSetsDownloaded += num;
}
else if (type != HISTORY_FILE_TYPE_RESULTS && type != HISTORY_FILE_TYPE_SCP)
else if (type != FileType::HISTORY_FILE_TYPE_RESULTS &&
type != FileType::HISTORY_FILE_TYPE_SCP)
{
throw std::runtime_error(fmt::format(
FMT_STRING(
"CatchupManagerImpl::fileDownloaded unknown file type {}"),
type));
typeString(type)));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/catchup/CatchupManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class CatchupManagerImpl : public CatchupManager
void ledgerChainsVerificationFailed(uint32_t num) override;
void bucketsApplied(uint32_t num) override;
void txSetsApplied(uint32_t num) override;
void fileDownloaded(std::string type, uint32_t num) override;
void fileDownloaded(FileType type, uint32_t num) override;

#ifdef BUILD_TESTS
std::map<uint32_t, LedgerCloseData> const&
Expand Down
8 changes: 4 additions & 4 deletions src/catchup/CatchupWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ CatchupWork::downloadVerifyLedgerChain(CatchupRange const& catchupRange,
// Batch download has default retries ("a few") to ensure we rotate through
// archives
auto getLedgers = std::make_shared<BatchDownloadWork>(
mApp, checkpointRange, HISTORY_FILE_TYPE_LEDGER, *mDownloadDir,
mArchive);
mApp, checkpointRange, FileType::HISTORY_FILE_TYPE_LEDGER,
*mDownloadDir, mArchive);
mRangeEndPromise = std::promise<LedgerNumHashPair>();
mRangeEndFuture = mRangeEndPromise.get_future().share();
mRangeEndPromise.set_value(rangeEnd);
Expand Down Expand Up @@ -598,8 +598,8 @@ CatchupWork::runCatchupStep()
auto checkpoint =
app.getHistoryManager().checkpointContainingLedger(
ledgerSeq);
auto ft =
FileTransferInfo(dir, HISTORY_FILE_TYPE_LEDGER, checkpoint);
auto ft = FileTransferInfo(
dir, FileType::HISTORY_FILE_TYPE_LEDGER, checkpoint);

return setHerderStateTo(ft, ledgerSeq, app);
};
Expand Down
9 changes: 5 additions & 4 deletions src/catchup/DownloadApplyTxsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ DownloadApplyTxsWork::yieldMoreWork()

CLOG_INFO(History,
"Downloading, unzipping and applying {} for checkpoint {}",
HISTORY_FILE_TYPE_TRANSACTIONS, mCheckpointToQueue);
FileTransferInfo ft(mDownloadDir, HISTORY_FILE_TYPE_TRANSACTIONS,
typeString(FileType::HISTORY_FILE_TYPE_TRANSACTIONS),
mCheckpointToQueue);
FileTransferInfo ft(mDownloadDir, FileType::HISTORY_FILE_TYPE_TRANSACTIONS,
mCheckpointToQueue);
auto getAndUnzip =
std::make_shared<GetAndUnzipRemoteFileWork>(mApp, ft, mArchive);
Expand All @@ -67,8 +68,8 @@ DownloadApplyTxsWork::yieldMoreWork()
auto archive = getFile->getArchive();
if (archive)
{
FileTransferInfo ti(dir, HISTORY_FILE_TYPE_TRANSACTIONS,
checkpoint);
FileTransferInfo ti(
dir, FileType::HISTORY_FILE_TYPE_TRANSACTIONS, checkpoint);
CLOG_ERROR(History, "Archive {} maybe contains corrupt file {}",
archive->getName(), ti.remoteName());
}
Expand Down
17 changes: 14 additions & 3 deletions src/catchup/VerifyLedgerChainWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ VerifyLedgerChainWork::verifyHistoryOfSingleCheckpoint()
// trusted hash passed in. If LCL is reached, verify that it agrees with
// the chain.

FileTransferInfo ft(mDownloadDir, HISTORY_FILE_TYPE_LEDGER,
FileTransferInfo ft(mDownloadDir, FileType::HISTORY_FILE_TYPE_LEDGER,
mCurrCheckpoint);
XDRInputFileStream hdrIn;
hdrIn.open(ft.localPath_nogz());
Expand All @@ -185,6 +185,8 @@ VerifyLedgerChainWork::verifyHistoryOfSingleCheckpoint()
CLOG_DEBUG(History, "Verifying ledger headers from {} for checkpoint {}",
ft.localPath_nogz(), mCurrCheckpoint);

auto const& hm = mApp.getHistoryManager();

while (hdrIn)
{
try
Expand Down Expand Up @@ -243,6 +245,15 @@ VerifyLedgerChainWork::verifyHistoryOfSingleCheckpoint()

if (beginCheckpoint)
{
if (!hm.isFirstLedgerInCheckpoint(curr.header.ledgerSeq))
{
CLOG_ERROR(
History, "Checkpoint did not start with {} - got {}",
hm.firstLedgerInCheckpointContaining(curr.header.ledgerSeq),
curr.header.ledgerSeq);
return HistoryManager::VERIFY_STATUS_ERR_MISSING_ENTRIES;
}

// At the beginning of checkpoint, we can't verify the link with
// previous ledger, so at least verify that header content hashes to
// correct value
Expand Down Expand Up @@ -301,8 +312,8 @@ VerifyLedgerChainWork::verifyHistoryOfSingleCheckpoint()
// or at mRange.last() if history chain file was valid and we
// reached last ledger in the range. Any other ledger here means
// that file is corrupted.
CLOG_ERROR(History, "History chain did not end with {} or {}",
mCurrCheckpoint, mRange.last());
CLOG_ERROR(History, "History chain did not end with {} or {} - got {}",
mCurrCheckpoint, mRange.last(), curr.header.ledgerSeq);
return HistoryManager::VERIFY_STATUS_ERR_MISSING_ENTRIES;
}

Expand Down
12 changes: 9 additions & 3 deletions src/database/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ bool Database::gDriversRegistered = false;

// smallest schema version supported
static unsigned long const MIN_SCHEMA_VERSION = 21;
static unsigned long const SCHEMA_VERSION = 22;
static unsigned long const SCHEMA_VERSION = 23;

// These should always match our compiled version precisely, since we are
// using a bundled version to get access to carray(). But in case someone
Expand Down Expand Up @@ -213,7 +213,11 @@ Database::applySchemaUpgrade(unsigned long vers)
switch (vers)
{
case 22:
deprecateTransactionFeeHistory(*this);
dropSupportTransactionFeeHistory(*this);
break;
case 23:
mApp.getHistoryManager().dropSQLBasedPublish();
Upgrades::dropSupportUpgradeHistory(*this);
break;
default:
throw std::runtime_error("Unknown DB schema version");
Expand Down Expand Up @@ -471,7 +475,9 @@ Database::initialize()
PersistentState::dropAll(*this);
ExternalQueue::dropAll(*this);
LedgerHeaderUtils::dropAll(*this);
dropTransactionHistory(*this, mApp.getConfig());
// No need to re-create txhistory, will be dropped during
// upgradeToCurrentSchema anyway
dropSupportTxHistory(*this);
HistoryManager::dropAll(*this);
HerderPersistence::dropAll(*this);
BanManager::dropAll(*this);
Expand Down
10 changes: 0 additions & 10 deletions src/database/DatabaseUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,5 @@ deleteOldEntriesHelper(soci::session& sess, uint32_t ledgerSeq, uint32_t count,
<< " <= " << m;
}
}

void
deleteNewerEntriesHelper(soci::session& sess, uint32_t ledgerSeq,
std::string const& tableName,
std::string const& ledgerSeqColumn)
{
sess << "DELETE FROM " << tableName << " WHERE " << ledgerSeqColumn
<< " >= " << ledgerSeq;
}

}
}
4 changes: 0 additions & 4 deletions src/database/DatabaseUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,5 @@ namespace DatabaseUtils
void deleteOldEntriesHelper(soci::session& sess, uint32_t ledgerSeq,
uint32_t count, std::string const& tableName,
std::string const& ledgerSeqColumn);

void deleteNewerEntriesHelper(soci::session& sess, uint32_t ledgerSeq,
std::string const& tableName,
std::string const& ledgerSeqColumn);
}
}
2 changes: 0 additions & 2 deletions src/herder/HerderPersistence.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ class HerderPersistence
static void dropAll(Database& db);
static void deleteOldEntries(Database& db, uint32_t ledgerSeq,
uint32_t count);
static void deleteNewerEntries(Database& db, uint32_t ledgerSeq);

static void createQuorumTrackingTable(soci::session& sess);
};
}
10 changes: 0 additions & 10 deletions src/herder/HerderPersistenceImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,14 +416,4 @@ HerderPersistence::deleteOldEntries(Database& db, uint32_t ledgerSeq,
DatabaseUtils::deleteOldEntriesHelper(db.getSession(), ledgerSeq, count,
"scpquorums", "lastledgerseq");
}

void
HerderPersistence::deleteNewerEntries(Database& db, uint32_t ledgerSeq)
{
ZoneScoped;
DatabaseUtils::deleteNewerEntriesHelper(db.getSession(), ledgerSeq,
"scphistory", "ledgerseq");
DatabaseUtils::deleteNewerEntriesHelper(db.getSession(), ledgerSeq,
"scpquorums", "lastledgerseq");
}
}
48 changes: 2 additions & 46 deletions src/herder/Upgrades.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,53 +703,9 @@ Upgrades::dropAll(Database& db)
}

void
Upgrades::storeUpgradeHistory(Database& db, uint32_t ledgerSeq,
LedgerUpgrade const& upgrade,
LedgerEntryChanges const& changes, int index)
Upgrades::dropSupportUpgradeHistory(Database& db)
{
ZoneScoped;
xdr::opaque_vec<> upgradeContent(xdr::xdr_to_opaque(upgrade));
std::string upgradeContent64 = decoder::encode_b64(upgradeContent);

xdr::opaque_vec<> upgradeChanges(xdr::xdr_to_opaque(changes));
std::string upgradeChanges64 = decoder::encode_b64(upgradeChanges);

auto prep = db.getPreparedStatement(
"INSERT INTO upgradehistory "
"(ledgerseq, upgradeindex, upgrade, changes) VALUES "
"(:seq, :upgradeindex, :upgrade, :changes)");

auto& st = prep.statement();
st.exchange(soci::use(ledgerSeq));
st.exchange(soci::use(index));
st.exchange(soci::use(upgradeContent64));
st.exchange(soci::use(upgradeChanges64));
st.define_and_bind();
{
ZoneNamedN(insertUpgradeZone, "insert upgradehistory", true);
st.execute(true);
}

if (st.get_affected_rows() != 1)
{
throw std::runtime_error("Could not update data in SQL");
}
}

void
Upgrades::deleteOldEntries(Database& db, uint32_t ledgerSeq, uint32_t count)
{
ZoneScoped;
DatabaseUtils::deleteOldEntriesHelper(db.getSession(), ledgerSeq, count,
"upgradehistory", "ledgerseq");
}

void
Upgrades::deleteNewerEntries(Database& db, uint32_t ledgerSeq)
{
ZoneScoped;
DatabaseUtils::deleteNewerEntriesHelper(db.getSession(), ledgerSeq,
"upgradehistory", "ledgerseq");
db.getSession() << "DROP TABLE IF EXISTS upgradehistory";
}

static void
Expand Down
9 changes: 1 addition & 8 deletions src/herder/Upgrades.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,7 @@ class Upgrades
uint64_t time, bool& updated);

static void dropAll(Database& db);

static void storeUpgradeHistory(Database& db, uint32_t ledgerSeq,
LedgerUpgrade const& upgrade,
LedgerEntryChanges const& changes,
int index);
static void deleteOldEntries(Database& db, uint32_t ledgerSeq,
uint32_t count);
static void deleteNewerEntries(Database& db, uint32_t ledgerSeq);
static void dropSupportUpgradeHistory(Database& db);

private:
UpgradeParameters mParams;
Expand Down
Loading
Loading