Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to boost fiber #2075

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions libraries/chain/db_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include <graphene/protocol/fee_schedule.hpp>

#include <fc/asio.hpp>
#include <fc/io/raw.hpp>
#include <fc/thread/parallel.hpp>

Expand Down Expand Up @@ -822,9 +823,9 @@ void database::_precompute_parallel( const Trx* trx, const size_t count, const u
}
}

fc::future<void> database::precompute_parallel( const signed_block& block, const uint32_t skip )const
boost::fibers::future<void> database::precompute_parallel( const signed_block& block, const uint32_t skip )const
{ try {
std::vector<fc::future<void>> workers;
std::vector<boost::fibers::future<void>> workers;
if( !block.transactions.empty() )
{
if( (skip & skip_expensive) == skip_expensive )
Expand All @@ -850,16 +851,20 @@ fc::future<void> database::precompute_parallel( const signed_block& block, const
block.id();

if( workers.empty() )
return fc::future< void >( fc::promise< void >::create( true ) );
{
boost::fibers::promise< void > done;
done.set_value();
return done.get_future();
}

auto first = workers.begin();
auto worker = first;
while( ++worker != workers.end() )
worker->wait();
return *first;
return std::move( *first );
} FC_LOG_AND_RETHROW() }

fc::future<void> database::precompute_parallel( const precomputable_transaction& trx )const
boost::fibers::future<void> database::precompute_parallel( const precomputable_transaction& trx )const
{
return fc::do_parallel([this,&trx] () {
_precompute_parallel( &trx, 1, skip_nothing );
Expand Down
4 changes: 2 additions & 2 deletions libraries/chain/db_management.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void database::reindex( fc::path data_dir )

size_t total_block_size = _block_id_to_block.total_block_size();
const auto& gpo = get_global_properties();
std::queue< std::tuple< size_t, signed_block, fc::future< void > > > blocks;
std::queue< std::tuple< size_t, signed_block, boost::fibers::future< void > > > blocks;
uint32_t next_block_num = head_block_num() + 1;
uint32_t i = next_block_num;
while( next_block_num <= last_block_num || !blocks.empty() )
Expand All @@ -93,7 +93,7 @@ void database::reindex( fc::path data_dir )
{
if( block->timestamp >= last_block->timestamp - gpo.parameters.maximum_time_until_expiration )
skip &= ~skip_transaction_dupe_check;
blocks.emplace( processed_block_size, std::move(*block), fc::future<void>() );
blocks.emplace( processed_block_size, std::move(*block), boost::fibers::future<void>() );
std::get<2>(blocks.back()) = precompute_parallel( std::get<1>(blocks.back()), skip );
}
else
Expand Down
9 changes: 6 additions & 3 deletions libraries/chain/include/graphene/chain/database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
#include <graphene/db/object_database.hpp>
#include <graphene/db/object.hpp>
#include <graphene/db/simple_index.hpp>
#include <fc/signals.hpp>

#include <fc/signals.hpp>
#include <fc/log/logger.hpp>

#include <boost/fiber/future.hpp>

#include <map>

namespace graphene { namespace chain {
Expand Down Expand Up @@ -453,7 +455,8 @@ namespace graphene { namespace chain {
* @return a future that will resolve to the input block with
* precomputations applied
*/
fc::future<void> precompute_parallel( const signed_block& block, const uint32_t skip = skip_nothing )const;
boost::fibers::future<void> precompute_parallel( const signed_block& block,
const uint32_t skip = skip_nothing )const;

/** Precomputes digests, signatures and operation validations.
* "Expensive" computations may be done in a parallel thread.
Expand All @@ -462,7 +465,7 @@ namespace graphene { namespace chain {
* @return a future that will resolve to the input transaction with
* precomputations applied
*/
fc::future<void> precompute_parallel( const precomputable_transaction& trx )const;
boost::fibers::future<void> precompute_parallel( const precomputable_transaction& trx )const;
private:
template<typename Trx>
void _precompute_parallel( const Trx* trx, const size_t count, const uint32_t skip )const;
Expand Down
4 changes: 2 additions & 2 deletions libraries/db/object_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void object_database::flush()
{
// ilog("Save object_database in ${d}", ("d", _data_dir));
fc::create_directories( _data_dir / "object_database.tmp" / "lock" );
std::vector<fc::future<void>> tasks;
std::vector<boost::fibers::future<void>> tasks;
tasks.reserve(200);
for( uint32_t space = 0; space < _index.size(); ++space )
{
Expand Down Expand Up @@ -109,7 +109,7 @@ void object_database::open(const fc::path& data_dir)
wlog("Ignoring locked object_database");
return;
}
std::vector<fc::future<void>> tasks;
std::vector<boost::fibers::future<void>> tasks;
tasks.reserve(200);
ilog("Opening object database from ${d} ...", ("d", data_dir));
for( uint32_t space = 0; space < _index.size(); ++space )
Expand Down
2 changes: 1 addition & 1 deletion libraries/fc
Submodule fc updated 64 files
+6 −9 CMakeLists.txt
+4 −1 include/fc/api.hpp
+65 −177 include/fc/asio.hpp
+0 −1 include/fc/io/stdio.hpp
+0 −10 include/fc/network/http/websocket.hpp
+1 −5 include/fc/network/tcp_socket.hpp
+1 −0 include/fc/rpc/api_connection.hpp
+9 −11 include/fc/rpc/cli.hpp
+11 −8 include/fc/rpc/state.hpp
+0 −15 include/fc/signals.hpp
+74 −0 include/fc/thread/async.hpp
+100 −0 include/fc/thread/fibers.hpp
+0 −344 include/fc/thread/future.hpp
+0 −109 include/fc/thread/mutex.hpp
+0 −35 include/fc/thread/non_preemptable_scope_check.hpp
+20 −17 include/fc/thread/parallel.hpp
+0 −22 include/fc/thread/priority.hpp
+0 −11 include/fc/thread/scoped_lock.hpp
+0 −35 include/fc/thread/spin_lock.hpp
+0 −35 include/fc/thread/spin_yield_lock.hpp
+0 −171 include/fc/thread/task.hpp
+0 −275 include/fc/thread/thread.hpp
+0 −84 include/fc/thread/thread_specific.hpp
+0 −42 include/fc/thread/unique_lock.hpp
+26 −89 src/asio.cpp
+12 −23 src/crypto/aes.cpp
+3 −160 src/io/iostream.cpp
+167 −0 src/io/stdio.cpp
+5 −7 src/log/appender.cpp
+4 −6 src/log/console_appender.cpp
+29 −24 src/log/file_appender.cpp
+1 −1 src/log/gelf_appender.cpp
+3 −6 src/log/log_message.cpp
+7 −8 src/log/logger.cpp
+173 −164 src/network/http/websocket.cpp
+120 −83 src/network/rate_limiting.cpp
+34 −26 src/network/tcp_socket.cpp
+37 −22 src/network/udp_socket.cpp
+59 −47 src/rpc/cli.cpp
+10 −16 src/rpc/state.cpp
+2 −2 src/rpc/websocket_api.cpp
+0 −256 src/thread/context.hpp
+214 −0 src/thread/fibers.cpp
+0 −140 src/thread/future.cpp
+0 −215 src/thread/mutex.cpp
+0 −20 src/thread/non_preemptable_scope_check.cpp
+121 −106 src/thread/parallel.cpp
+0 −46 src/thread/spin_lock.cpp
+0 −51 src/thread/spin_yield_lock.cpp
+0 −113 src/thread/task.cpp
+0 −530 src/thread/thread.cpp
+0 −835 src/thread/thread_d.hpp
+0 −65 src/thread/thread_specific.cpp
+0 −12 tests/CMakeLists.txt
+31 −18 tests/api_tests.cpp
+3 −5 tests/logging_tests.cpp
+41 −12 tests/network/http/websocket_test.cpp
+7 −5 tests/stacktrace_test.cpp
+108 −82 tests/thread/parallel_tests.cpp
+0 −249 tests/thread/task_cancel.cpp
+22 −27 tests/thread/thread_tests.cpp
+54 −0 tests/thread/worker_thread.hxx
+328 −0 vendor/boost/fiber/asio/detail/yield.hpp
+63 −0 vendor/boost/fiber/asio/yield.hpp
2 changes: 1 addition & 1 deletion libraries/net/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ set(SOURCES node.cpp
add_library( graphene_net ${SOURCES} ${HEADERS} )

target_link_libraries( graphene_net
PUBLIC fc graphene_db graphene_protocol )
PUBLIC fc graphene_db graphene_protocol graphene_utilities )
target_include_directories( graphene_net
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include"
PRIVATE "${CMAKE_SOURCE_DIR}/libraries/chain/include"
Expand Down
28 changes: 2 additions & 26 deletions libraries/net/include/graphene/net/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <graphene/protocol/types.hpp>

#include <list>
#include <thread>

namespace graphene { namespace net {

Expand Down Expand Up @@ -293,35 +294,10 @@ namespace graphene { namespace net {
void disable_peer_advertising();
fc::variant_object get_call_statistics() const;
private:
std::unique_ptr<detail::node_impl, detail::node_impl_deleter> my;
std::unique_ptr<detail::node_impl> my;
};

class simulated_network : public node
{
public:
~simulated_network();
simulated_network(const std::string& user_agent) : node(user_agent) {}
void listen_to_p2p_network() override {}
void connect_to_p2p_network() override {}
void connect_to_endpoint(const fc::ip::endpoint& ep) override {}

fc::ip::endpoint get_actual_listening_endpoint() const override { return fc::ip::endpoint(); }

void sync_from(const item_id& current_head_block, const std::vector<uint32_t>& hard_fork_block_numbers) override {}
void broadcast(const message& item_to_broadcast) override;
void add_node_delegate(node_delegate* node_delegate_to_add);

virtual uint32_t get_connection_count() const override { return 8; }
private:
struct node_info;
void message_sender(node_info* destination_node);
std::list<node_info*> network_nodes;
};


typedef std::shared_ptr<node> node_ptr;
typedef std::shared_ptr<simulated_network> simulated_network_ptr;

} } // graphene::net

FC_REFLECT(graphene::net::message_propagation_data, (received_time)(validated_time)(originating_peer));
Expand Down
5 changes: 2 additions & 3 deletions libraries/net/include/graphene/net/peer_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

#include <queue>
#include <boost/container/deque.hpp>
#include <fc/thread/future.hpp>

namespace graphene { namespace net
{
Expand Down Expand Up @@ -166,7 +165,7 @@ namespace graphene { namespace net

size_t _total_queued_messages_size = 0;
std::queue<std::unique_ptr<queued_message>, std::list<std::unique_ptr<queued_message> > > _queued_messages;
fc::future<void> _send_queued_messages_done;
boost::fibers::future<void> _send_queued_messages_done;
public:
fc::time_point connection_initiation_time;
fc::time_point connection_closed_time;
Expand Down Expand Up @@ -260,7 +259,7 @@ namespace graphene { namespace net

uint32_t last_known_fork_block_number = 0;

fc::future<void> accept_or_connect_task_done;
boost::fibers::future<void> accept_or_connect_task_done;

firewall_check_state_data *firewall_check_state = nullptr;
private:
Expand Down
Loading