Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lite-client/query-utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ QueryInfo get_query_info(const lite_api::Function& f) {
[&](const lite_api::liteServer_getShardBlockProof& q) { from_block_id(q.id_); },
[&](const lite_api::liteServer_nonfinal_getCandidate& q) { /* t_simple */ },
[&](const lite_api::liteServer_nonfinal_getValidatorGroups& q) { /* t_simple */ },
[&](const lite_api::liteServer_nonfinal_getPendingShardBlocks& q) { /* t_simple */ },
[&](const lite_api::liteServer_getOutMsgQueueSizes& q) {
// This query is expected to be removed, as it is not fully compatible with separated liteservers
/* t_simple */
Expand Down
4 changes: 2 additions & 2 deletions tdactor/td/actor/coro_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ struct promise_type : promise_value<td::Result<T>> {
}
};

template <class T>
template <class T = Unit>
struct [[nodiscard]] Task {
using value_type = T;

Expand Down Expand Up @@ -360,7 +360,7 @@ struct [[nodiscard]] Task {
}
};

template <class T>
template <class T = Unit>
struct [[nodiscard]] StartedTask {
using value_type = T;

Expand Down
2 changes: 2 additions & 0 deletions tl/generate/scheme/lite_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ liteServer.nonfinal.candidate id:liteServer.nonfinal.candidateId data:bytes coll
liteServer.nonfinal.candidateInfo id:liteServer.nonfinal.candidateId available:Bool approved_weight:long signed_weight:long total_weight:long = liteServer.nonfinal.CandidateInfo;
liteServer.nonfinal.validatorGroupInfo next_block_id:tonNode.blockId cc_seqno:int prev:(vector tonNode.blockIdExt) candidates:(vector liteServer.nonfinal.candidateInfo) = liteServer.nonfinal.ValidatorGroupInfo;
liteServer.nonfinal.validatorGroups groups:(vector liteServer.nonfinal.validatorGroupInfo) = liteServer.nonfinal.ValidatorGroups;
liteServer.nonfinal.pendingShardBlocks signed_blocks:(vector tonNode.blockIdExt) candidates:(vector tonNode.blockIdExt) = liteServer.nonfinal.PendingShardBlocks;

---functions---

Expand Down Expand Up @@ -111,6 +112,7 @@ liteServer.getDispatchQueueMessages mode:# id:tonNode.blockIdExt addr:int256 aft

liteServer.nonfinal.getValidatorGroups mode:# wc:mode.0?int shard:mode.0?long = liteServer.nonfinal.ValidatorGroups;
liteServer.nonfinal.getCandidate id:liteServer.nonfinal.candidateId = liteServer.nonfinal.Candidate;
liteServer.nonfinal.getPendingShardBlocks mode:# wc:mode.0?int shard:mode.0?long = liteServer.nonfinal.PendingShardBlocks;

liteServer.queryPrefix = Object;
liteServer.query data:bytes = Object;
Expand Down
6 changes: 5 additions & 1 deletion tl/generate/scheme/ton_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,11 @@ db.files.package.key package_id:int key:Bool temp:Bool = db.files.Key;
db.files.index.value packages:(vector int) key_packages:(vector int) temp_packages:(vector int) = db.files.index.Value;
db.files.package.firstBlock workchain:int shard:long seqno:int unixtime:int lt:long = db.files.package.FirstBlock;
db.files.package.value package_id:int key:Bool temp:Bool firstblocks:(vector db.files.package.firstBlock) deleted:Bool
= db.files.package.Value;
= db.files.package.Value;

db.event.blockApplied block_id:tonNode.blockIdExt = db.Event;
db.event.blockSigned block_id:tonNode.blockIdExt = db.Event;
db.event.blockCandidateReceived block_id:tonNode.blockIdExt = db.Event;

---functions---

Expand Down
4 changes: 4 additions & 0 deletions validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1639,6 +1639,7 @@ td::Status ValidatorEngine::load_global_config() {
validator_options_.write().set_hardforks(std::move(h));
validator_options_.write().set_catchain_broadcast_speed_multiplier(broadcast_speed_multiplier_catchain_);
validator_options_.write().set_parallel_validation(parallel_validation_);
validator_options_.write().set_db_event_fifo_path(db_event_fifo_path_);

for (auto &id : config_.collator_node_whitelist) {
validator_options_.write().set_collator_node_whitelisted_validator(id, true);
Expand Down Expand Up @@ -5608,6 +5609,9 @@ int main(int argc, char *argv[]) {
p.add_option('\0', "parallel-validation", "parallel validation over different accounts", [&]() {
acts.push_back([&x]() { td::actor::send_closure(x, &ValidatorEngine::set_parallel_validation, true); });
});
p.add_option('\0', "db-event-fifo", "path to FIFO pipe for publishing DB events", [&](td::Slice s) {
acts.push_back([&x, s = s.str()]() { td::actor::send_closure(x, &ValidatorEngine::set_db_event_fifo_path, s); });
});
auto S = p.run(argc, argv);
if (S.is_error()) {
LOG(ERROR) << "failed to parse options: " << S.move_as_error();
Expand Down
4 changes: 4 additions & 0 deletions validator-engine/validator-engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ class ValidatorEngine : public td::actor::Actor {
ton::adnl::AdnlNodeIdShort shard_block_retainer_adnl_id_ = ton::adnl::AdnlNodeIdShort::zero();
bool shard_block_retainer_adnl_id_fullnode_ = false;
bool parallel_validation_ = false;
std::string db_event_fifo_path_;
ton::validator::fullnode::FullNodeOptions full_node_options_ = {.config_ = {},
.public_broadcast_speed_multiplier_ = 3.33,
.private_broadcast_speed_multiplier_ = 3.33,
Expand Down Expand Up @@ -391,6 +392,9 @@ class ValidatorEngine : public td::actor::Actor {
void set_parallel_validation(bool value) {
parallel_validation_ = value;
}
void set_db_event_fifo_path(std::string value) {
db_event_fifo_path_ = std::move(value);
}
void set_initial_sync_delay(double value) {
full_node_options_.initial_sync_delay_ = value;
}
Expand Down
2 changes: 2 additions & 0 deletions validator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ set(VALIDATOR_DB_SOURCE
db/staticfilesdb.hpp
db/db-utils.cpp
db/db-utils.h
db/db-event-publisher.cpp
db/db-event-publisher.hpp

db/package.hpp
db/package.cpp
Expand Down
143 changes: 143 additions & 0 deletions validator/db/db-event-publisher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
This file is part of TON Blockchain Library.

TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.

TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
*/

#include "tl-utils/common-utils.hpp"
#include "tl-utils/tl-utils.hpp"

#include "db-event-publisher.hpp"

#if TD_PORT_POSIX
#include <cerrno>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#endif

namespace ton::validator {

void DbEventPublisher::publish(tl_object_ptr<ton_api::db_Event> event) {
#if !TD_PORT_POSIX
if (!unsupported_logged_) {
LOG(WARNING) << "DB events FIFO is not supported on this platform";
unsupported_logged_ = true;
}
return;
#else
if (disabled_) {
return;
}
auto status = ensure_ready();
if (status.is_error()) {
if (!ready_error_logged_) {
LOG(ERROR) << "Failed to prepare DB events FIFO '" << fifo_path_ << "': " << status;
ready_error_logged_ = true;
}
disabled_ = true;
return;
}
td::BufferSlice data = serialize_tl_object(event, true);
switch (write_once(data)) {
case WriteStatus::Ok:
no_reader_logged_ = false;
temp_error_logged_ = false;
break;
case WriteStatus::NoReader:
if (!no_reader_logged_) {
LOG(INFO) << "DB events FIFO '" << fifo_path_ << "' has no reader. Dropping event";
no_reader_logged_ = true;
}
break;
case WriteStatus::TemporaryError:
if (!temp_error_logged_) {
LOG(WARNING) << "DB events FIFO '" << fifo_path_ << "' is temporarily unavailable. Dropping event";
temp_error_logged_ = true;
}
break;
case WriteStatus::FatalError:
if (!write_error_logged_) {
LOG(ERROR) << "Failed to publish DB event to '" << fifo_path_ << "', disabling events";
write_error_logged_ = true;
}
disabled_ = true;
break;
}
#endif
}

#if TD_PORT_POSIX
td::Status DbEventPublisher::ensure_ready() {
if (fifo_ready_) {
return td::Status::OK();
}
struct stat st;
if (lstat(fifo_path_.c_str(), &st) == 0) {
if (!S_ISFIFO(st.st_mode)) {
return td::Status::Error(PSLICE() << "path '" << fifo_path_ << "' exists and is not a FIFO");
}
} else {
if (errno != ENOENT) {
return td::Status::PosixError(errno, PSLICE() << "stat failed for '" << fifo_path_ << "'");
}
if (mkfifo(fifo_path_.c_str(), 0660) != 0 && errno != EEXIST) {
return td::Status::PosixError(errno, PSLICE() << "mkfifo failed for '" << fifo_path_ << "'");
}
}
fifo_ready_ = true;
return td::Status::OK();
}

DbEventPublisher::WriteStatus DbEventPublisher::write_once(td::Slice data) {
td::int32 flags = O_WRONLY | O_NONBLOCK | O_CLOEXEC;
int fd = open(fifo_path_.c_str(), flags);
if (fd < 0) {
if (errno == ENXIO || errno == EPIPE) {
return WriteStatus::NoReader;
}
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
return WriteStatus::TemporaryError;
}
return WriteStatus::FatalError;
}
auto guard = td::ScopeExit() + [&] {
while (close(fd) != 0) {
if (errno != EINTR) {
break;
}
}
};
while (!data.empty()) {
auto written = write(fd, data.data(), data.size());
if (written < 0) {
if (errno == EINTR) {
continue;
}
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return WriteStatus::TemporaryError;
}
if (errno == EPIPE) {
return WriteStatus::NoReader;
}
return WriteStatus::FatalError;
}
data.remove_prefix(static_cast<size_t>(written));
}
return WriteStatus::Ok;
}
#endif

} // namespace ton::validator
51 changes: 51 additions & 0 deletions validator/db/db-event-publisher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
This file is part of TON Blockchain Library.

TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.

TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
*/

#pragma once
#include <string>

#include "auto/tl/ton_api.h"
#include "td/actor/actor.h"
#include "td/actor/coro_utils.h"
#include "tl/TlObject.h"

namespace ton::validator {

class DbEventPublisher : public td::actor::Actor {
public:
explicit DbEventPublisher(std::string fifo_path) : fifo_path_(std::move(fifo_path)) {
}
void publish(tl_object_ptr<ton_api::db_Event> event);

private:
std::string fifo_path_;
bool fifo_ready_ = false;
bool disabled_ = false;
bool ready_error_logged_ = false;
bool write_error_logged_ = false;
bool no_reader_logged_ = false;
bool temp_error_logged_ = false;
bool unsupported_logged_ = false;

#if TD_PORT_POSIX
enum class WriteStatus { Ok, NoReader, TemporaryError, FatalError };
td::Status ensure_ready();
WriteStatus write_once(td::Slice data);
#endif
};

} // namespace ton::validator
22 changes: 22 additions & 0 deletions validator/full-node-custom-overlays.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ void FullNodeCustomOverlay::process_block_candidate_broadcast(PublicKeyHash src,
validator_set_hash, std::move(data));
}

void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) {
BlockIdExt block_id = create_block_id(query.block_->block_);
VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast in custom overlay \"" << name_ << "\" from " << src << ": "
<< block_id.to_str();
td::actor::send_closure(full_node_, &FullNode::process_shard_block_info_broadcast, block_id, query.block_->cc_seqno_,
std::move(query.block_->data_));
}

void FullNodeCustomOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
if (adnl::AdnlNodeIdShort{src} == local_id_) {
return;
Expand Down Expand Up @@ -175,6 +183,20 @@ void FullNodeCustomOverlay::send_block_candidate(BlockIdExt block_id, CatchainSe
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}

void FullNodeCustomOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) {
VLOG(FULL_NODE_DEBUG) << "Sending newShardBlockBroadcast in custom overlay \"" << name_
<< "\": " << block_id.to_str();
auto B = create_serialize_tl_object<ton_api::tonNode_newShardBlockBroadcast>(
create_tl_object<ton_api::tonNode_newShardBlock>(create_tl_block_id(block_id), cc_seqno, std::move(data)));
if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(B));
} else {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B));
}
}

void FullNodeCustomOverlay::start_up() {
std::sort(nodes_.begin(), nodes_.end());
nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end());
Expand Down
2 changes: 2 additions & 0 deletions validator/full-node-custom-overlays.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class FullNodeCustomOverlay : public td::actor::Actor {
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressedV2 &query);
void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query);

template <class T>
void process_broadcast(PublicKeyHash, T &) {
Expand All @@ -47,6 +48,7 @@ class FullNodeCustomOverlay : public td::actor::Actor {
void send_broadcast(BlockBroadcast broadcast);
void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data);
void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data);

void set_config(FullNodeConfig config) {
opts_.config_ = std::move(config);
Expand Down
4 changes: 2 additions & 2 deletions validator/full-node-fast-sync-overlays.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonN
BlockIdExt block_id = create_block_id(query.block_->block_);
VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast in fast sync overlay from " << src << ": "
<< block_id.to_str();
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block_description_broadcast,
block_id, query.block_->cc_seqno_, std::move(query.block_->data_));
td::actor::send_closure(full_node_, &FullNode::process_shard_block_info_broadcast, block_id, query.block_->cc_seqno_,
std::move(query.block_->data_));
}

void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query) {
Expand Down
4 changes: 2 additions & 2 deletions validator/full-node-shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -773,8 +773,8 @@ void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_ex
void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) {
BlockIdExt block_id = create_block_id(query.block_->block_);
VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast from " << src << ": " << block_id.to_str();
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block_description_broadcast,
block_id, query.block_->cc_seqno_, std::move(query.block_->data_));
td::actor::send_closure(full_node_, &FullNode::process_shard_block_info_broadcast, block_id, query.block_->cc_seqno_,
std::move(query.block_->data_));
}

void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query) {
Expand Down
Loading
Loading