Skip to content

Commit

Permalink
MINIFICPP-1372 - Allow async content deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
adamdebreceni committed Aug 8, 2023
1 parent d559764 commit e32c6ae
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 28 deletions.
11 changes: 11 additions & 0 deletions CONFIGURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,17 @@ Finally, as the last line is commented out, it will make the state manager use p

When multiple repositories use the same directory (as with `minifidb://` scheme) they should either be all plaintext or all encrypted with the same key.

### Configuring Repository Cleanup

When a flow file content is no longer needed we can specify the deletion strategy.

# any value other than 0 enables async cleanup with the specified period
# while a value of 0 sec triggers an immediate deletion as soon as the resource
# is not needed
# (the default value is 1 sec)
nifi.database.content.repository.purge.period = 1 sec


### Configuring Volatile and NO-OP Repositories
Each of the repositories can be configured to be volatile ( state kept in memory and flushed
upon restart ) or persistent. Currently, the flow file and provenance repositories can persist
Expand Down
3 changes: 3 additions & 0 deletions conf/minifi.properties
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ nifi.content.repository.class.name=DatabaseContentRepository
# nifi.flowfile.repository.rocksdb.compaction.period=2 min
# nifi.database.content.repository.rocksdb.compaction.period=2 min

# setting this value to 0 enables synchronous deletion
# nifi.database.content.repository.purge.period = 1 sec

#nifi.remote.input.secure=true
#nifi.security.need.ClientAuth=
#nifi.security.client.certificate=
Expand Down
92 changes: 71 additions & 21 deletions extensions/rocksdb-repos/DatabaseContentRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "database/RocksDbUtils.h"
#include "database/StringAppender.h"
#include "core/Resource.h"
#include "core/TypedValues.h"

namespace org::apache::nifi::minifi::core::repository {

Expand All @@ -40,6 +41,13 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
} else {
directory_ = (configuration->getHome() / "dbcontentrepository").string();
}
auto purge_period_str = configuration->get(Configure::nifi_dbcontent_repository_purge_period).value_or("1 s");
if (auto purge_period_val = core::TimePeriodValue::fromString(purge_period_str)) {
purge_period_ = purge_period_val->getMilliseconds();
} else {
logger_->log_error("Malformed delete period value, expected time format: '%s'", purge_period_str);
purge_period_ = std::chrono::seconds{1};
}
const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configuration->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
logger_->log_info("Using %s DatabaseContentRepository", encrypted_env ? "encrypted" : "plaintext");

Expand Down Expand Up @@ -107,10 +115,15 @@ void DatabaseContentRepository::start() {
return;
}
if (compaction_period_.count() != 0) {
compaction_thread_ = std::make_unique<utils::StoppableThread>([this] () {
compaction_thread_ = std::make_unique<utils::StoppableThread>([this] {
runCompaction();
});
}
if (purge_period_.count() != 0) {
gc_thread_ = std::make_unique<utils::StoppableThread>([this] {
runGc();
});
}
}

void DatabaseContentRepository::stop() {
Expand All @@ -120,6 +133,7 @@ void DatabaseContentRepository::stop() {
opendb->FlushWAL(true);
}
compaction_thread_.reset();
gc_thread_.reset();
}
}

Expand Down Expand Up @@ -198,27 +212,63 @@ bool DatabaseContentRepository::exists(const minifi::ResourceClaim &streamId) {
}

bool DatabaseContentRepository::removeKey(const std::string& content_path) {
if (!is_valid_ || !db_) {
logger_->log_error("DB is not valid, could not delete %s", content_path);
return false;
}
auto opendb = db_->open();
if (!opendb) {
logger_->log_error("Could not open DB, did not delete %s", content_path);
return false;
}
rocksdb::Status status;
status = opendb->Delete(rocksdb::WriteOptions(), content_path);
if (status.ok()) {
logger_->log_debug("Deleting resource %s", content_path);
return true;
} else if (status.IsNotFound()) {
logger_->log_debug("Resource %s was not found", content_path);
return true;
} else {
logger_->log_error("Attempted, but could not delete %s", content_path);
return false;
if (purge_period_ == std::chrono::seconds(0)) {
if (!is_valid_ || !db_)
return false;
// synchronous deletion
auto opendb = db_->open();
if (!opendb) {
return false;
}
rocksdb::Status status = opendb->Delete(rocksdb::WriteOptions(), content_path);
if (status.ok()) {
logger_->log_debug("Deleting resource %s", content_path);
return true;
} else if (status.IsNotFound()) {
logger_->log_debug("Resource %s was not found", content_path);
return true;
} else {
logger_->log_debug("Attempted, but could not delete %s", content_path);
return false;
}
}
// asynchronous deletion
std::lock_guard guard(keys_mtx_);
logger_->log_debug("Staging resource for deletion %s", content_path);
keys_to_delete_.push_back(content_path);
return true;
}

void DatabaseContentRepository::runGc() {
do {
auto opendb = db_->open();
if (!opendb) {
continue;
}
std::vector<std::string> keys;
{
std::lock_guard guard(keys_mtx_);
keys = std::exchange(keys_to_delete_, std::vector<std::string>{});
}
auto batch = opendb->createWriteBatch();
for (auto& key : keys) {
batch.Delete(key);
}
rocksdb::Status status;
status = opendb->Write(rocksdb::WriteOptions(), &batch);
if (status.ok()) {
for (auto& key : keys) {
logger_->log_debug("Deleted resource async %s", key);
}
} else {
for (auto& key : keys) {
logger_->log_debug("Failed to delete resource async %s", key);
}
// move keys we could not delete back to the list for a retry
std::lock_guard guard(keys_mtx_);
keys_to_delete_.insert(keys_to_delete_.end(), keys.begin(), keys.end());
}
} while (!utils::StoppableThread::waitForStopRequest(purge_period_));
}

std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::ResourceClaim& claim, bool /*append*/, minifi::internal::WriteBatch* batch) {
Expand Down
9 changes: 9 additions & 0 deletions extensions/rocksdb-repos/DatabaseContentRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <string>
#include <utility>
#include <thread>
#include <vector>

#include "core/ContentRepository.h"
#include "core/BufferedContentSession.h"
Expand Down Expand Up @@ -78,6 +79,9 @@ class DatabaseContentRepository : public core::ContentRepository {
uint64_t getRepositoryEntryCount() const override;
std::optional<RepositoryMetricsSource::RocksDbStats> getRocksDbStats() const override;

private:
void runGc();

protected:
bool removeKey(const std::string& content_path) override;

Expand All @@ -92,6 +96,11 @@ class DatabaseContentRepository : public core::ContentRepository {

std::chrono::milliseconds compaction_period_{DEFAULT_COMPACTION_PERIOD};
std::unique_ptr<utils::StoppableThread> compaction_thread_;

std::chrono::milliseconds purge_period_{std::chrono::seconds{1}};
std::mutex keys_mtx_;
std::vector<std::string> keys_to_delete_;
std::unique_ptr<utils::StoppableThread> gc_thread_;
};

} // namespace org::apache::nifi::minifi::core::repository
1 change: 1 addition & 0 deletions libminifi/include/properties/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class Configuration : public Properties {
// these are internal properties related to the rocksdb backend
static constexpr const char *nifi_flowfile_repository_rocksdb_compaction_period = "nifi.flowfile.repository.rocksdb.compaction.period";
static constexpr const char *nifi_dbcontent_repository_rocksdb_compaction_period = "nifi.database.content.repository.rocksdb.compaction.period";
static constexpr const char *nifi_dbcontent_repository_purge_period = "nifi.database.content.repository.purge.period";

static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure";
static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
Expand Down
1 change: 1 addition & 0 deletions libminifi/src/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const std::unordered_map<std::string_view, gsl::not_null<const core::PropertyVal
{Configuration::nifi_dbcontent_repository_directory_default, gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)},
{Configuration::nifi_flowfile_repository_rocksdb_compaction_period, gsl::make_not_null(&core::StandardPropertyTypes::TIME_PERIOD_TYPE)},
{Configuration::nifi_dbcontent_repository_rocksdb_compaction_period, gsl::make_not_null(&core::StandardPropertyTypes::TIME_PERIOD_TYPE)},
{Configuration::nifi_dbcontent_repository_purge_period, gsl::make_not_null(&core::StandardPropertyTypes::TIME_PERIOD_TYPE)},
{Configuration::nifi_remote_input_secure, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
{Configuration::nifi_security_need_ClientAuth, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)},
{Configuration::nifi_sensitive_props_additional_keys, gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)},
Expand Down
1 change: 1 addition & 0 deletions libminifi/test/persistence-tests/PersistenceTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ TEST_CASE("Persisted flowFiles are updated on modification", "[TestP1]") {
auto config = std::make_shared<minifi::Configure>();
config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, (dir / "content_repository").string());
config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string());
config->set(minifi::Configure::nifi_dbcontent_repository_purge_period, "0 s");

std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>();
std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
Expand Down
33 changes: 27 additions & 6 deletions libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "../Catch.h"
#include "../unit/ProvenanceTestHelper.h"
#include "../unit/ContentRepositoryDependentTests.h"
#include "IntegrationTestUtils.h"

class TestDatabaseContentRepository : public core::repository::DatabaseContentRepository {
public:
Expand Down Expand Up @@ -79,6 +80,7 @@ TEST_CASE("Write Claim", "[TestDBCR1]") {

TEST_CASE("Delete Claim", "[TestDBCR2]") {
TestController testController;
LogTestController::getInstance().setDebug<core::repository::DatabaseContentRepository>();
auto dir = testController.createTempDirectory();
auto content_repo = std::make_shared<TestDatabaseContentRepository>();

Expand All @@ -103,16 +105,35 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") {

configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
REQUIRE(content_repo->initialize(configuration));

content_repo->remove(*claim);
std::string readstr;

auto read_stream = content_repo->read(*claim);
SECTION("Sync") {
configuration->set(minifi::Configure::nifi_dbcontent_repository_purge_period, "0 s");
REQUIRE(content_repo->initialize(configuration));

std::string readstr;
content_repo->remove(*claim);

// error tells us we have an invalid stream
REQUIRE(minifi::io::isError(read_stream->read(readstr)));
auto read_stream = content_repo->read(*claim);

// error tells us we have an invalid stream
REQUIRE(minifi::io::isError(read_stream->read(readstr)));
}

SECTION("Async") {
configuration->set(minifi::Configure::nifi_dbcontent_repository_purge_period, "100 ms");
REQUIRE(content_repo->initialize(configuration));
content_repo->start();

content_repo->remove(*claim);

// an immediate read will still be able to access the content
REQUIRE_FALSE(minifi::io::isError(content_repo->read(*claim)->read(readstr)));

REQUIRE(minifi::utils::verifyEventHappenedInPollTime(1s, [&] {
return minifi::io::isError(content_repo->read(*claim)->read(readstr));
}));
}
}

TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void provisionRepo(minifi::provenance::ProvenanceRepository& repo, size_t number
}

void verifyMaxKeyCount(const minifi::provenance::ProvenanceRepository& repo, uint64_t keyCount) {
uint64_t k = keyCount;
uint64_t k = std::numeric_limits<uint64_t>::max();

for (int i = 0; i < 50; ++i) {
std::this_thread::sleep_for(100ms);
Expand Down

0 comments on commit e32c6ae

Please sign in to comment.