27 #include <unordered_set>
29 #include <forward_list>
34 #include <boost/tuple/tuple.hpp>
35 #include <boost/circular_buffer.hpp>
37 #include <boost/multi_index_container.hpp>
38 #include <boost/multi_index/ordered_index.hpp>
39 #include <boost/multi_index/mem_fun.hpp>
40 #include <boost/multi_index/member.hpp>
41 #include <boost/multi_index/random_access_index.hpp>
42 #include <boost/multi_index/tag.hpp>
43 #include <boost/multi_index/sequenced_index.hpp>
44 #include <boost/multi_index/hashed_index.hpp>
45 #include <boost/logic/tribool.hpp>
46 #include <boost/range/algorithm_ext/push_back.hpp>
47 #include <boost/range/algorithm/find.hpp>
48 #include <boost/range/numeric.hpp>
50 #include <boost/preprocessor/seq/for_each.hpp>
51 #include <boost/preprocessor/cat.hpp>
52 #include <boost/preprocessor/stringize.hpp>
85 namespace graphene {
namespace net {
namespace detail {
90 if( block_clock > cache_duration_in_blocks )
91 _message_cache.get<block_clock_index>().erase(_message_cache.get<block_clock_index>().begin(),
92 _message_cache.get<block_clock_index>().lower_bound(block_clock - cache_duration_in_blocks ) );
100 _message_cache.insert( message_info(hash_of_message_to_cache,
104 message_content_hash ) );
109 message_cache_container::index<message_hash_index>::type::const_iterator iter =
110 _message_cache.get<message_hash_index>().find(hash_of_message_to_lookup );
111 if( iter != _message_cache.get<message_hash_index>().end() )
112 return iter->message_body;
121 message_cache_container::index<message_contents_hash_index>::type::const_iterator iter =
122 _message_cache.get<message_contents_hash_index>().find(hash_of_msg_contents_to_lookup );
123 if( iter != _message_cache.get<message_contents_hash_index>().end() )
124 return iter->propagation_data;
131 #ifdef P2P_IN_DEDICATED_THREAD
132 std::weak_ptr<fc::thread> weak_thread;
135 std::shared_ptr<fc::thread> impl_thread(impl_to_delete->
_thread);
136 weak_thread = impl_thread;
137 impl_thread->async([impl_to_delete](){
delete impl_to_delete; },
"delete node_impl").
wait();
138 dlog(
"deleting the p2p thread");
140 if (weak_thread.expired())
141 dlog(
"done deleting the p2p thread");
143 dlog(
"failed to delete the p2p thread, we must be leaking a smart pointer somewhere");
144 #else // P2P_IN_DEDICATED_THREAD
145 delete impl_to_delete;
146 #endif // P2P_IN_DEDICATED_THREAD
149 #ifdef P2P_IN_DEDICATED_THREAD
150 # define VERIFY_CORRECT_THREAD() assert(_thread->is_current())
152 # define VERIFY_CORRECT_THREAD() do {} while (0)
156 static void greatly_delay_next_conn_to( node_impl* impl,
const fc::ip::endpoint& ep )
158 dlog(
"Greatly delaying the next connection to endpoint ${ep}", (
"ep", ep) );
160 = impl->_potential_peer_db.lookup_entry_for_endpoint( ep );
161 if( updated_peer_record )
165 constexpr uint32_t failed_attempts_to_add = 120;
166 updated_peer_record->number_of_failed_connection_attempts += failed_attempts_to_add;
167 impl->_potential_peer_db.update_entry( *updated_peer_record );
171 static void save_successful_address( node_impl* impl,
const fc::ip::endpoint& ep )
173 dlog(
"Saving successfully connected endpoint ${ep} to peer database", (
"ep", ep) );
174 auto updated_peer_record = impl->_potential_peer_db.lookup_or_create_entry_for_ep( ep );
178 constexpr uint16_t two = 2;
179 updated_peer_record.number_of_failed_connection_attempts /= two;
181 impl->_potential_peer_db.update_entry(updated_peer_record);
183 static void update_address_seen_time( node_impl* impl,
const peer_connection* active_peer )
186 if( inbound_endpoint.
valid() && inbound_endpoint->
port() != 0 )
189 = impl->_potential_peer_db.lookup_entry_for_endpoint( *inbound_endpoint );
190 if( updated_peer_record )
193 impl->_potential_peer_db.update_entry( *updated_peer_record );
197 static void update_address_seen_time( node_impl* impl,
const peer_connection_ptr& active_peer )
199 update_address_seen_time( impl, active_peer.get() );
206 fc::flat_set<fc::ip::endpoint>
list;
210 FC_ASSERT( !address_list.empty(),
"The peer node list must not be empty" );
217 list.insert( fc::ip::endpoint::from_string(str) );
221 wlog(
"Address ${addr} invalid.", (
"addr", str) );
233 using generic_list_address_builder::generic_list_address_builder;
237 return !( list.find(in) == list.end() );
247 using generic_list_address_builder::generic_list_address_builder;
251 return ( list.find( in ) == list.end() );
266 std::shared_ptr<node_impl::address_builder> node_impl::address_builder::create_default_address_builder()
268 return std::make_shared<all_address_builder>();
289 if( inbound_endpoint.
valid() && should_advertise( *inbound_endpoint ) )
291 update_address_seen_time( impl, active_peer );
292 reply.
addresses.emplace_back( *inbound_endpoint,
294 active_peer->round_trip_delay,
295 active_peer->node_id,
296 active_peer->direction,
297 active_peer->is_firewalled );
302 node_impl::node_impl(
const std::string& user_agent) :
303 _user_agent_string(user_agent)
313 ilog(
"cleaning up node" );
320 update_address_seen_time(
this, active_peer );
331 wlog(
"unexpected exception on close ${e}", (
"e", e) );
346 dlog(
"Saved node configuration to file ${filename}", (
"filename", configuration_file_name ) );
348 catch (
const fc::canceled_exception&)
354 wlog(
"error writing node configuration to file ${filename}: ${error}",
355 (
"filename", configuration_file_name )(
"error", except.
to_detail_string() ) );
366 dlog(
"Starting an iteration of p2p_network_connect_loop().");
373 std::list<potential_peer_record> add_once_node_list;
375 dlog(
"Processing \"add once\" node list containing ${count} peers:",
376 (
"count", add_once_node_list.size()));
379 dlog(
" ${peer}", (
"peer", add_once_peer.endpoint));
385 if(!existing_connection_ptr)
388 dlog(
"Done processing \"add once\" node list");
393 bool initiated_connection_this_pass =
false;
408 && ( !last_connection_not_ok
412 initiated_connection_this_pass =
true;
431 dlog(
"Still want to connect to more nodes, but I don't have any good candidates. Trying again in 15 seconds" );
433 dlog(
"I still have some \"add once\" nodes to connect to. Trying again in 15 seconds" );
438 dlog(
"I don't need any more connections, waiting forever until something changes" );
442 catch ( fc::timeout_exception& )
449 catch (
const fc::canceled_exception&)
451 ilog(
"p2p_network_connect_loop canceled" );
461 dlog(
"Triggering connect loop now" );
473 ilog(
"Starting an iteration of update_seed_nodes loop.");
474 for(
const std::string& endpoint_string :
_seed_nodes )
478 ilog(
"Done an iteration of update_seed_nodes loop.");
480 catch (
const fc::canceled_exception&)
482 ilog(
"update_seed_nodes_task canceled" );
500 constexpr uint32_t five = 5;
504 "update_seed_nodes_loop" );
519 dlog(
"requesting item ${item_hash} from peer ${endpoint}", (
"item_hash", item_to_request )(
"endpoint", peer->get_remote_endpoint() ) );
523 peer->sync_items_requested_from_peer.insert(item_to_request);
530 dlog(
"requesting ${item_count} item(s) ${items_to_request} from peer ${endpoint}",
531 (
"item_count", items_to_request.size())(
"items_to_request", items_to_request)(
"endpoint", peer->get_remote_endpoint()) );
532 for (
const item_hash_t& item_to_request : items_to_request)
536 peer->sync_items_requested_from_peer.insert(item_to_request);
547 dlog(
"beginning another iteration of the sync items loop" );
551 std::map<peer_connection_ptr, std::vector<item_hash_t> > sync_item_requests_to_send;
554 std::set<item_hash_t> sync_items_to_request;
560 if( peer->we_need_sync_items_from_peer &&
562 sync_item_requests_to_send.find(peer) == sync_item_requests_to_send.end() &&
565 if (!peer->inhibit_fetching_sync_blocks)
568 for(
const auto& item_to_potentially_request : peer->ids_of_items_to_get )
575 sync_items_to_request.find(item_to_potentially_request) == sync_items_to_request.end() &&
580 sync_item_requests_to_send[peer].push_back(item_to_potentially_request);
581 sync_items_to_request.insert( item_to_potentially_request );
592 for(
auto sync_item_request : sync_item_requests_to_send )
594 sync_item_requests_to_send.clear();
597 dlog(
"fetch_sync_items_loop is suspended pending backlog processing");
601 dlog(
"no sync items to fetch right now, going to sleep" );
613 dlog(
"Triggering fetch sync items loop now" );
624 if (peer->inventory_peer_advertised_to_us.find(item) != peer->inventory_peer_advertised_to_us.end() )
636 dlog(
"beginning an iteration of fetch items (${count} items to fetch)",
646 struct requested_item_count_index {};
647 struct peer_and_items_to_fetch
650 std::vector<item_id> item_ids;
652 bool operator<(
const peer_and_items_to_fetch& rhs)
const {
return peer < rhs.peer; }
653 size_t number_of_items()
const {
return item_ids.size(); }
655 using fetch_messages_to_send_set = boost::multi_index_container< peer_and_items_to_fetch, bmi::indexed_by<
657 bmi::member<peer_and_items_to_fetch, peer_connection_ptr, &peer_and_items_to_fetch::peer> >,
658 bmi::ordered_non_unique< bmi::tag<requested_item_count_index>,
659 bmi::const_mem_fun<peer_and_items_to_fetch, size_t, &peer_and_items_to_fetch::number_of_items> >
661 fetch_messages_to_send_set items_by_peer;
668 items_by_peer.insert(peer_and_items_to_fetch(peer));
674 if (item_iter->timestamp < oldest_timestamp_to_fetch)
679 wlog(
"Unable to fetch item ${item} before its likely expiration time, "
680 "removing it from our list of items to fetch",
681 (
"item", item_iter->item));
687 bool item_fetched =
false;
688 for (
auto peer_iter = items_by_peer.get<requested_item_count_index>().begin(); peer_iter != items_by_peer.get<requested_item_count_index>().end(); ++peer_iter)
693 peer->inventory_peer_advertised_to_us.find(item_iter->item) != peer->inventory_peer_advertised_to_us.end())
696 next_peer_unblocked_time = std::min(peer->transaction_fetching_inhibited_until, next_peer_unblocked_time);
701 item_id item_id_to_fetch = item_iter->item;
702 peer->items_requested_from_peer.insert(peer_connection::item_to_time_map_type::value_type(
706 items_by_peer.get<requested_item_count_index>().modify(peer_iter,
707 [&item_id_to_fetch](peer_and_items_to_fetch& peer_and_items) {
708 peer_and_items.item_ids.push_back(item_id_to_fetch);
720 for (
const peer_and_items_to_fetch& peer_and_items : items_by_peer)
724 std::map<uint32_t, std::vector<item_hash_t> > items_to_fetch_by_type;
725 for (
const item_id& item : peer_and_items.item_ids)
727 for (
auto& items_by_type : items_to_fetch_by_type)
729 dlog(
"requesting ${count} items of type ${type} from peer ${endpoint}: ${hashes}",
730 (
"count", items_by_type.second.size())(
"type", (uint32_t)items_by_type.first)
731 (
"endpoint", peer_and_items.peer->get_remote_endpoint())
732 (
"hashes", items_by_type.second));
734 items_by_type.second));
737 items_by_peer.clear();
750 catch (
const fc::timeout_exception&)
752 dlog(
"Resuming fetch_items_loop due to timeout -- one of our peers should no longer be throttled");
772 dlog(
"beginning an iteration of advertise inventory");
774 std::unordered_set<item_id> inventory_to_advertise;
780 std::list<std::pair<peer_connection_ptr, item_ids_inventory_message> > inventory_messages_to_send;
787 if( !peer->peer_needs_sync_items_from_us )
789 std::map<uint32_t, std::vector<item_hash_t> > items_to_advertise_by_type;
793 size_t total_items_to_send = 0;
795 for (
const item_id& item_to_advertise : inventory_to_advertise)
797 auto adv_to_peer = peer->inventory_advertised_to_peer.find(item_to_advertise);
798 auto adv_to_us = peer->inventory_peer_advertised_to_us.find(item_to_advertise);
800 if (adv_to_peer == peer->inventory_advertised_to_peer.end() &&
801 adv_to_us == peer->inventory_peer_advertised_to_us.end())
803 items_to_advertise_by_type[item_to_advertise.item_type].push_back(item_to_advertise.item_hash);
804 peer->inventory_advertised_to_peer.insert(
806 ++total_items_to_send;
808 testnetlog(
"advertising transaction ${id} to peer ${endpoint}",
809 (
"id", item_to_advertise.item_hash)(
"endpoint", peer->get_remote_endpoint()));
810 dlog(
"advertising item ${id} to peer ${endpoint}",
811 (
"id", item_to_advertise.item_hash)(
"endpoint", peer->get_remote_endpoint()));
815 if( adv_to_peer != peer->inventory_advertised_to_peer.end() )
816 dlog(
"adv_to_peer != peer->inventory_advertised_to_peer.end() : ${adv_to_peer}",
817 (
"adv_to_peer", *adv_to_peer) );
818 if( adv_to_us != peer->inventory_peer_advertised_to_us.end() )
819 dlog(
"adv_to_us != peer->inventory_peer_advertised_to_us.end() : ${adv_to_us}",
820 (
"adv_to_us", *adv_to_us) );
823 dlog(
"advertising ${count} new item(s) of ${types} type(s) to peer ${endpoint}",
824 (
"count", total_items_to_send)
825 (
"types", items_to_advertise_by_type.size())
826 (
"endpoint", peer->get_remote_endpoint()));
827 for (
auto items_group : items_to_advertise_by_type)
829 inventory_messages_to_send.emplace_back(std::make_pair(
833 peer->clear_old_inventory();
837 for (
auto iter = inventory_messages_to_send.begin(); iter != inventory_messages_to_send.end(); ++iter)
838 iter->first->send_message(iter->second);
839 inventory_messages_to_send.clear();
861 std::list<peer_connection_ptr> peers_to_disconnect_gently;
862 std::list<peer_connection_ptr> peers_to_disconnect_forcibly;
863 std::list<peer_connection_ptr> peers_to_send_keep_alive;
864 std::list<peer_connection_ptr> peers_to_terminate;
889 if( handshaking_peer->connection_initiation_time < handshaking_disconnect_threshold &&
890 handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold &&
891 handshaking_peer->get_last_message_sent_time() < handshaking_disconnect_threshold )
893 wlog(
"Forcibly disconnecting from handshaking peer ${peer} due to inactivity of at least ${timeout} seconds",
894 (
"peer", handshaking_peer->get_remote_endpoint() )(
"timeout", handshaking_timeout ) );
895 wlog(
"Peer's negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}",
896 (
"status", handshaking_peer->negotiation_status)
897 (
"sent", handshaking_peer->get_total_bytes_sent())
898 (
"received", handshaking_peer->get_total_bytes_received()));
900 "Terminating handshaking connection due to inactivity of ${timeout} seconds. Negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}",
901 (
"peer", handshaking_peer->get_remote_endpoint())
902 (
"timeout", handshaking_timeout)
903 (
"status", handshaking_peer->negotiation_status)
904 (
"sent", handshaking_peer->get_total_bytes_sent())
905 (
"received", handshaking_peer->get_total_bytes_received())));
906 peers_to_disconnect_forcibly.push_back( handshaking_peer );
912 uint32_t active_send_keepalive_timeout = active_disconnect_timeout / 2;
932 if( active_peer->connection_initiation_time < active_disconnect_threshold &&
933 active_peer->get_last_message_received_time() < active_disconnect_threshold )
935 wlog(
"Closing connection with peer ${peer} due to inactivity of at least ${timeout} seconds",
936 (
"peer", active_peer->get_remote_endpoint() )(
"timeout", active_disconnect_timeout ) );
937 peers_to_disconnect_gently.push_back( active_peer );
941 bool disconnect_due_to_request_timeout =
false;
942 if (!active_peer->sync_items_requested_from_peer.empty() &&
943 active_peer->last_sync_item_received_time < active_ignored_request_threshold)
945 wlog(
"Disconnecting peer ${peer} because they haven't made any progress on my remaining ${count} sync item requests",
946 (
"peer", active_peer->get_remote_endpoint())(
"count",
947 active_peer->sync_items_requested_from_peer.size()));
948 disconnect_due_to_request_timeout =
true;
950 if (!disconnect_due_to_request_timeout &&
951 active_peer->item_ids_requested_from_peer &&
952 active_peer->item_ids_requested_from_peer->get<1>() < active_ignored_request_threshold)
954 wlog(
"Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${synopsis}",
955 (
"peer", active_peer->get_remote_endpoint())
956 (
"synopsis", active_peer->item_ids_requested_from_peer->get<0>()));
957 disconnect_due_to_request_timeout =
true;
959 if (!disconnect_due_to_request_timeout)
960 for (
const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->items_requested_from_peer)
961 if (item_and_time.second < active_ignored_request_threshold)
963 wlog(
"Disconnecting peer ${peer} because they didn't respond to my request for item ${id}",
964 (
"peer", active_peer->get_remote_endpoint())(
"id", item_and_time.first.item_hash));
965 disconnect_due_to_request_timeout =
true;
968 if (disconnect_due_to_request_timeout)
973 peers_to_disconnect_forcibly.push_back(active_peer);
975 else if (active_peer->connection_initiation_time < active_send_keepalive_threshold &&
976 active_peer->get_last_message_received_time() < active_send_keepalive_threshold)
978 wlog(
"Sending a keepalive message to peer ${peer} who hasn't sent us any messages in the last ${timeout} seconds",
979 (
"peer", active_peer->get_remote_endpoint() )(
"timeout", active_send_keepalive_timeout ) );
980 peers_to_send_keep_alive.push_back(active_peer);
982 else if (active_peer->we_need_sync_items_from_peer &&
983 !active_peer->is_currently_handling_message() &&
984 !active_peer->item_ids_requested_from_peer &&
985 active_peer->ids_of_items_to_get.empty())
989 fc_wlog(
fc::logger::get(
"sync"),
"Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.",
990 (
"peer", active_peer->get_remote_endpoint()));
991 wlog(
"Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.",
992 (
"peer", active_peer->get_remote_endpoint()));
993 peers_to_disconnect_forcibly.push_back(active_peer);
1004 if( closing_peer->connection_closed_time < closing_disconnect_threshold )
1008 wlog(
"Forcibly disconnecting peer ${peer} who failed to close their connection in a timely manner",
1009 (
"peer", closing_peer->get_remote_endpoint() ) );
1010 peers_to_disconnect_forcibly.push_back( closing_peer );
1014 uint32_t failed_terminate_timeout_seconds = 120;
1021 peer->get_connection_terminated_time() < failed_terminate_threshold)
1023 wlog(
"Terminating connection with peer ${peer}, closing the connection didn't work", (
"peer", peer->get_remote_endpoint()));
1024 peers_to_terminate.push_back(peer);
1043 peers_to_terminate.clear();
1051 peer->close_connection();
1053 peers_to_disconnect_forcibly.clear();
1062 (
"last_message_received_seconds_ago", (peer->get_last_message_received_time()
1064 (
"last_message_sent_seconds_ago", (peer->get_last_message_sent_time()
1071 peers_to_disconnect_gently.clear();
1076 peers_to_send_keep_alive.clear();
1091 "kill_inactive_conns_loop" );
1106 active_peer->expecting_address_message =
true;
1109 catch (
const fc::canceled_exception& )
1115 dlog(
"Caught exception while sending address request message to peer ${peer} : ${e}",
1116 (
"peer", active_peer->get_remote_endpoint())(
"e", e));
1125 .lower_bound(oldest_failed_ids_to_keep);
1128 .erase(begin_iter, oldest_failed_ids_to_keep_iter);
1133 "fetch_updated_peer_lists_loop" );
1141 constexpr uint8_t seconds_per_minute = 60;
1142 constexpr uint8_t minutes_per_hour = 60;
1175 seconds_since_last_update = std::max(UINT32_C(1), seconds_since_last_update);
1178 for (uint32_t i = 0; i < seconds_since_last_update - 1; ++i)
1186 "bandwidth_monitor_loop" );
1196 "dump_node_status_task");
1202 #ifdef USE_PEERS_TO_DELETE_MUTEX
1204 dlog(
"in delayed_peer_deletion_task with ${count} in queue", (
"count",
_peers_to_delete.size()));
1206 dlog(
"_peers_to_delete cleared");
1210 std::list<peer_connection_ptr> peers_to_delete_copy;
1211 dlog(
"beginning an iteration of delayed_peer_deletion_task with ${count} in queue",
1215 dlog(
"leaving delayed_peer_deletion_task");
1228 #ifdef USE_PEERS_TO_DELETE_MUTEX
1229 dlog(
"scheduling peer for deletion: ${peer} (may block on a mutex here)",
1230 (
"peer", peer_to_delete->get_remote_endpoint()));
1232 size_t number_of_peers_to_delete;
1238 dlog(
"peer scheduled for deletion: ${peer}", (
"peer", peer_to_delete->get_remote_endpoint()));
1243 dlog(
"asyncing delayed_peer_deletion_task to delete ${size} peers",
1244 (
"size", number_of_peers_to_delete));
1246 "delayed_peer_deletion_task" );
1249 dlog(
"delayed_peer_deletion_task is already scheduled (current size of _peers_to_delete is ${size})",
1250 (
"size", number_of_peers_to_delete));
1252 dlog(
"scheduling peer for deletion: ${peer} (this will not block)",
1253 (
"peer", peer_to_delete->get_remote_endpoint()));
1258 dlog(
"asyncing delayed_peer_deletion_task to delete ${size} peers", (
"size",
_peers_to_delete.size()));
1260 "delayed_peer_deletion_task" );
1263 dlog(
"delayed_peer_deletion_task is already scheduled (current size of _peers_to_delete is ${size})",
1295 if (node_id == active_peer->node_id)
1301 if (node_id == handshaking_peer->node_id)
1302 return handshaking_peer;
1311 bool new_information_received =
false;
1316 if( 0 == address.remote_endpoint.port() )
1326 if (address.last_seen_time > updated_peer_record.last_seen_time)
1329 new_information_received =
true;
1330 updated_peer_record.last_seen_time = address.last_seen_time;
1335 return new_information_received;
1341 dlog(
"Currently have ${current} of [${desired}/${max}] connections",
1351 dlog(
" active: ${endpoint} with ${id} [${direction}]",
1352 (
"endpoint", active_connection->get_remote_endpoint())
1353 (
"id", active_connection->node_id)
1354 (
"direction", active_connection->direction));
1361 dlog(
" handshaking: ${endpoint} with ${id} [${direction}]",
1362 (
"endpoint", handshaking_connection->get_remote_endpoint())
1363 (
"id", handshaking_connection->node_id)
1364 (
"direction", handshaking_connection->direction));
1373 dlog(
"handling message ${type} ${hash} size ${size} from peer ${endpoint}",
1375 (
"size", received_message.
size)
1384 dlog(
"Unexpected message from peer ${peer} while we have requested to close connection",
1388 switch ( received_message.
msg_type.value() )
1464 #if defined( __APPLE__ )
1465 user_data[
"platform"] =
"osx";
1466 #elif defined( __OpenBSD__ )
1467 user_data[
"platform"] =
"obsd";
1468 #elif defined( __linux__ )
1469 user_data[
"platform"] =
"linux";
1470 #elif defined( _MSC_VER )
1471 user_data[
"platform"] =
"win32";
1473 user_data[
"platform"] =
"other";
1475 user_data[
"bitness"] =
sizeof(
void*) * 8;
1480 user_data[
"last_known_block_hash"] =
fc::variant( head_block_id, 1 );
1481 user_data[
"last_known_block_number"] =
_delegate->get_block_number(head_block_id);
1482 user_data[
"last_known_block_time"] =
_delegate->get_block_time(head_block_id);
1493 if (user_data.
contains(
"graphene_git_revision_sha"))
1495 if (user_data.
contains(
"graphene_git_revision_unix_timestamp"))
1497 user_data[
"graphene_git_revision_unix_timestamp"].as<uint32_t>(1));
1498 if (user_data.
contains(
"fc_git_revision_sha"))
1500 if (user_data.
contains(
"fc_git_revision_unix_timestamp"))
1502 user_data[
"fc_git_revision_unix_timestamp"].as<uint32_t>(1));
1503 if (user_data.
contains(
"platform"))
1504 originating_peer->
platform = user_data[
"platform"].as_string();
1506 originating_peer->
bitness = user_data[
"bitness"].as<uint32_t>(1);
1509 if (user_data.
contains(
"last_known_fork_block_number"))
1531 wlog(
"Unexpected hello_message from peer ${peer}, disconnecting",
1532 (
"peer", remote_endpoint) );
1540 wlog(
"Received hello message from peer ${peer} on a different chain: ${message}",
1541 (
"peer", remote_endpoint)
1542 (
"message", hello_message_received) );
1549 greatly_delay_next_conn_to(
this, *remote_endpoint );
1552 std::ostringstream rejection_message;
1553 rejection_message <<
"You're on a different chain than I am. I'm on " <<
_chain_id.
str() <<
1554 " and you're on " << hello_message_received.
chain_id.
str();
1558 rejection_message.str() );
1575 shared_secret_encoder.
write(shared_secret.
data(),
sizeof(shared_secret));
1577 shared_secret_encoder.
result(),
false );
1581 wlog(
"Error when validating signature in hello message from peer ${peer}: ${e}",
1585 if( !expected_node_public_key
1588 wlog(
"Invalid signature in hello message from peer ${peer}",
1589 (
"peer", remote_endpoint) );
1593 "Invalid signature in hello message" );
1610 dlog(
"Peer ${endpoint} sent us a hello message without a valid node_id in user_data",
1611 (
"endpoint", remote_endpoint ) );
1615 if( null_node_id == peer_node_id )
1617 wlog(
"The node_id in the hello_message from peer ${peer} is null, disconnecting",
1618 (
"peer", remote_endpoint) );
1625 ilog(
"Received a hello_message from peer ${peer} with id ${id} that is myself or claimed to be myself, "
1627 (
"peer", remote_endpoint)
1628 (
"id", peer_node_id) );
1635 greatly_delay_next_conn_to(
this, *remote_endpoint );
1643 "I'm connecting to myself" );
1683 || originating_peer->
inbound_address == remote_endpoint->get_address() )
1709 if (next_fork_block_number != 0)
1713 uint32_t head_block_num =
_delegate->get_block_number(
_delegate->get_head_block_id());
1714 if (next_fork_block_number < head_block_num)
1716 #ifdef ENABLE_DEBUG_ULOGS
1719 wlog(
"Received hello message from peer running a version of that can only understand blocks up to #${their_hard_fork}, but I'm at head block number #${my_block_number}",
1720 (
"their_hard_fork", next_fork_block_number)(
"my_block_number", head_block_num));
1721 std::ostringstream rejection_message;
1722 rejection_message <<
"Your client is outdated -- you can only understand blocks up to #" << next_fork_block_number <<
", but I'm already on block #" << head_block_num;
1726 rejection_message.str() );
1744 && originating_peer->
node_public_key == already_connected_peer->node_public_key )
1746 auto already_connected_endpoint = already_connected_peer->get_remote_endpoint();
1747 ilog(
"Verified that endpoint ${ep} is reachable and belongs to peer ${peer} with id ${id}",
1748 (
"ep", remote_endpoint)
1749 (
"peer", already_connected_endpoint)
1750 (
"id", already_connected_peer->node_id) );
1764 already_connected_peer->additional_inbound_endpoints.insert( *remote_endpoint );
1767 already_connected_peer->potential_inbound_endpoints[*remote_endpoint]
1771 || remote_endpoint->get_address().is_public_address()
1772 || !already_connected_peer->get_endpoint_for_connecting()->get_address().is_public_address() )
1774 ilog(
"Saving verification result ${ep} for peer ${peer} with id ${id}",
1775 (
"ep", remote_endpoint)
1776 (
"peer", already_connected_endpoint)
1777 (
"id", already_connected_peer->node_id) );
1778 already_connected_peer->remote_inbound_endpoint = remote_endpoint;
1783 == already_connected_peer->negotiation_status )
1784 save_successful_address(
this, *remote_endpoint );
1790 "I'm already connected to you" );
1793 ilog(
"Received a hello_message from peer ${peer} that I'm already connected to (with id ${id}), rejection",
1794 (
"peer", remote_endpoint)
1795 (
"id", originating_peer->
node_id));
1799 #ifdef ENABLE_P2P_DEBUGGING_API
1800 else if(!_allowed_peers.empty() &&
1801 _allowed_peers.find(originating_peer->
node_id) == _allowed_peers.end())
1806 "you are not in my allowed_peers list");
1809 dlog(
"Received a hello_message from peer ${peer} who isn't in my allowed_peers list, rejection",
1810 (
"peer", remote_endpoint ) );
1812 #endif // ENABLE_P2P_DEBUGGING_API
1824 ilog(
"peer ${peer} did not give an inbound port so I'm treating them as if they are firewalled.",
1825 (
"peer", remote_endpoint) );
1833 fc::flat_set<fc::ip::endpoint> endpoints_to_save;
1839 endpoints_to_save.insert( *remote_endpoint );
1843 endpoints_to_save.insert(
fc::ip::endpoint( remote_endpoint->get_address(),
1846 ilog(
"Saving potential endpoints to the peer database for peer ${peer}: ${endpoints}",
1847 (
"peer", remote_endpoint) (
"endpoints", endpoints_to_save) );
1849 for(
const auto& ep : endpoints_to_save )
1868 "not accepting any more incoming connections");
1871 ilog(
"Received a hello_message from peer ${peer}, but I'm not accepting any more connections, rejection",
1872 (
"peer", remote_endpoint));
1878 ilog(
"Received a hello_message from peer ${peer}, sending reply to accept connection",
1879 (
"peer", remote_endpoint));
1893 wlog(
"Received an unexpected connection_accepted message from ${peer}",
1898 ilog(
"Received a connection_accepted in response to my \"hello\" from ${peer}",
1911 ilog(
"Received a rejection from ${peer} in response to my \"hello\", reason: \"${reason}\"",
1913 (
"reason", connection_rejected_message_received.
reason_string));
1949 if (updated_peer_record)
1966 wlog(
"Unexpected connection_rejected_message from peer ${peer}, disconnecting",
1968 disconnect_from_peer( originating_peer,
"Received an unexpected connection_rejected_message" );
1979 wlog(
"Unexpected address_request_message from peer ${peer}, disconnecting",
1985 dlog(
"Received an address request message from peer ${peer}",
1997 "I rejected your connection request (hello message) so I'm disconnecting" );
2002 const std::vector<std::string>& advertise_or_exclude_list )
2005 if (algo ==
"exclude_list")
2007 _address_builder = std::make_shared<exclude_address_builder>(advertise_or_exclude_list);
2009 else if (algo ==
"list")
2011 _address_builder = std::make_shared<list_address_builder>(advertise_or_exclude_list);
2013 else if (algo ==
"nothing")
2034 wlog(
"Received an unexpected address message containing ${size} addresses for peer ${peer}",
2035 (
"size", address_message_received.
addresses.size())
2041 dlog(
"Received an address message containing ${size} addresses for peer ${peer}",
2042 (
"size", address_message_received.
addresses.size())
2049 dlog(
" ${endpoint} last seen ${time}, firewalled status ${fw}",
2056 std::vector<graphene::net::address_info> updated_addresses;
2057 updated_addresses.reserve( count );
2083 disconnect_from_peer(originating_peer,
"You rejected my connection request (hello message) so I'm disconnecting");
2085 disconnect_from_peer(originating_peer,
"I rejected your connection request (hello message) so I'm disconnecting");
2111 wlog(
"Unexpected fetch_blockchain_item_ids_message from peer ${peer}, disconnecting",
2113 disconnect_from_peer( originating_peer,
"Received an unexpected fetch_blockchain_item_ids_message" );
2120 dlog(
"sync: received a request for item ids starting at the beginning of the chain "
2121 "from peer ${peer_endpoint} (full request: ${synopsis})",
2128 dlog(
"sync: received a request for item ids after ${last_item_seen} from peer ${peer_endpoint} (full request: ${synopsis})",
2129 (
"last_item_seen", peers_last_item_hash_seen)
2132 peers_last_item_seen.
item_hash = peers_last_item_hash_seen;
2144 catch (
const peer_is_on_an_unreachable_fork&)
2146 dlog(
"Peer is on a fork and there's no set of blocks we can provide to switch them to our fork");
2152 bool disconnect_from_inhibited_peer =
false;
2167 disconnect_from_inhibited_peer =
true;
2174 dlog(
"sync: peer is already in sync with us");
2180 !
_delegate->has_item(peers_last_item_seen))
2188 dlog(
"sync: peer is out of sync, sending peer ${count} items ids: first: ${first_item_id}, last: ${last_item_id}",
2194 !
_delegate->has_item(peers_last_item_seen))
2202 if (disconnect_from_inhibited_peer)
2214 dlog(
"peer ${endpoint} which was handshaking with us has started synchronizing with us, "
2215 "start syncing with it",
2233 uint32_t max_number_of_unfetched_items = 0;
2237 uint32_t this_peer_unfetched_items_count = (uint32_t)peer->ids_of_items_to_get.size()
2238 + peer->number_of_unfetched_item_ids;
2239 max_number_of_unfetched_items = std::max(max_number_of_unfetched_items,
2240 this_peer_unfetched_items_count);
2242 return max_number_of_unfetched_items;
2260 uint32_t number_of_blocks_after_reference_point = original_ids_of_items_to_get.size();
2262 std::vector<item_hash_t> synopsis =
_delegate->get_blockchain_synopsis(reference_point,
2263 number_of_blocks_after_reference_point);
2267 if (synopsis.empty())
2268 synopsis =
_delegate->get_blockchain_synopsis(reference_point, number_of_blocks_after_reference_point);
2274 if (reference_point !=
item_hash_t() && synopsis.empty())
2275 FC_THROW_EXCEPTION(block_older_than_undo_history,
"You are on a fork I'm unable to switch to");
2278 if( number_of_blocks_after_reference_point )
2281 uint32_t first_block_num_in_ids_to_get =
_delegate->get_block_number(original_ids_of_items_to_get.front());
2282 uint32_t true_high_block_num = first_block_num_in_ids_to_get + original_ids_of_items_to_get.size() - 1;
2286 uint32_t low_block_num = synopsis.empty() ? 1 :
_delegate->get_block_number(synopsis.front());
2290 if( low_block_num >= first_block_num_in_ids_to_get )
2291 synopsis.push_back(original_ids_of_items_to_get[low_block_num - first_block_num_in_ids_to_get]);
2292 low_block_num += (true_high_block_num - low_block_num + 2 ) / 2;
2294 while ( low_block_num <= true_high_block_num );
2295 assert(synopsis.back() == original_ids_of_items_to_get.back());
2303 if( reset_fork_tracking_data_for_peer )
2315 dlog(
"sync: sending a request for the next items after ${last_item_seen} to peer ${peer}, "
2316 "(full request is ${blockchain_synopsis})",
2317 (
"last_item_seen", last_item_seen )
2319 (
"blockchain_synopsis", blockchain_synopsis ) );
2323 catch (
const block_older_than_undo_history& e)
2325 synopsis_exception = e;
2327 if (synopsis_exception)
2344 uint32_t first_block_number_in_reponse =
_delegate->get_block_number(
2347 auto total_items = uint32_t( blockchain_item_ids_inventory_message_received.
item_hashes_available.size() );
2348 for (uint32_t i = 1; i < total_items; ++i)
2350 uint32_t actual_num =
_delegate->get_block_number(
2352 uint32_t expected_num = first_block_number_in_reponse + i;
2353 if (actual_num != expected_num)
2355 wlog(
"Invalid response from peer ${peer_endpoint}. The list of blocks they provided is not sequential, "
2356 "the ${position}th block in their reply was block number ${actual_num}, "
2357 "but it should have been number ${expected_num}",
2360 (
"actual_num", actual_num)
2361 (
"expected_num", expected_num));
2363 "You gave an invalid response to my request for sync blocks. The list of blocks you provided is not sequential, "
2364 "the ${position}th block in their reply was block number ${actual_num}, "
2365 "but it should have been number ${expected_num}",
2367 (
"actual_num", actual_num)
2368 (
"expected_num", expected_num)));
2370 "You gave an invalid response to my request for sync blocks",
2371 true, error_for_peer);
2379 if (synopsis_sent_in_request.empty())
2382 if (
_delegate->get_block_number(first_item_hash) != 1)
2384 wlog(
"Invalid response from peer ${peer_endpoint}. We requested a list of sync blocks starting from the beginning of the chain, "
2385 "but they provided a list of blocks starting with ${first_block}",
2387 (
"first_block", first_item_hash));
2388 fc::exception error_for_peer(
FC_LOG_MESSAGE(error,
"You gave an invalid response for my request for sync blocks. I asked for blocks starting from the beginning of the chain, "
2389 "but you returned a list of blocks starting with ${first_block}",
2390 (
"first_block", first_item_hash)));
2392 "You gave an invalid response to my request for sync blocks",
2393 true, error_for_peer);
2399 if (boost::range::find(synopsis_sent_in_request, first_item_hash) == synopsis_sent_in_request.end())
2401 wlog(
"Invalid response from peer ${peer_endpoint}. We requested a list of sync blocks based on the synopsis ${synopsis}, but they "
2402 "provided a list of blocks starting with ${first_block}",
2404 (
"synopsis", synopsis_sent_in_request)
2405 (
"first_block", first_item_hash));
2406 fc::exception error_for_peer(
FC_LOG_MESSAGE(error,
"You gave an invalid response for my request for sync blocks. I asked for blocks following something in "
2407 "${synopsis}, but you returned a list of blocks starting with ${first_block} which wasn't one of your choices",
2408 (
"synopsis", synopsis_sent_in_request)
2409 (
"first_block", first_item_hash)));
2411 "You gave an invalid response to my request for sync blocks",
2412 true, error_for_peer);
2424 dlog(
"sync: received a list of ${count} available items from ${peer_endpoint}",
2441 dlog(
"sync: peer said we're up-to-date, entering normal operation with this peer" );
2446 if( new_number_of_unfetched_items == 0 )
2447 _delegate->sync_status( blockchain_item_ids_inventory_message_received.
item_type, 0 );
2452 std::deque<item_hash_t> item_hashes_received( blockchain_item_ids_inventory_message_received.
item_hashes_available.begin(),
2456 if (!item_hashes_received.empty() &&
2459 bool is_first_item_for_other_peer =
false;
2464 if (peer != originating_peer->shared_from_this() &&
2466 peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.
item_hashes_available.front())
2468 dlog(
"The item ${newitem} is the first item for peer ${peer}",
2470 (
"peer", peer->get_remote_endpoint()));
2471 is_first_item_for_other_peer =
true;
2476 dlog(
"is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}",
2477 (
"is_first", is_first_item_for_other_peer)(
"size", item_hashes_received.size()));
2478 if (!is_first_item_for_other_peer)
2480 while (!item_hashes_received.empty() &&
2482 item_hashes_received.front())))
2484 assert(item_hashes_received.front() !=
item_hash_t());
2487 dlog(
"popping item because delegate has already seen it. peer ${peer}'s last block the delegate has seen is now ${block_id} (actual block #${actual_block_num})",
2490 (
"actual_block_num",
_delegate->get_block_number(item_hashes_received.front())));
2492 item_hashes_received.pop_front();
2494 dlog(
"after removing all items we have already seen, item_hashes_received.size() = ${size}", (
"size", item_hashes_received.size()));
2497 else if (!item_hashes_received.empty())
2527 item_hashes_received.pop_front();
2548 wlog(
"Disconnecting from peer ${peer} who offered us an implausible number of blocks, their last block would be in the future (${timestamp})",
2550 (
"timestamp", minimum_time_of_last_offered_block));
2551 fc::exception error_for_peer(
FC_LOG_MESSAGE(error,
"You offered me a list of more sync blocks than could possibly exist. Total blocks offered: ${blocks}, Minimum time of the last block you offered: ${minimum_time_of_last_offered_block}, Now: ${now}",
2553 (
"minimum_time_of_last_offered_block", minimum_time_of_last_offered_block)
2556 "You offered me a list of more sync blocks than could possibly exist",
2557 true, error_for_peer);
2567 new_number_of_unfetched_items);
2604 catch (
const fc::canceled_exception&)
2610 elog(
"Caught unexpected exception: ${e}", (
"e", e));
2611 assert(
false &&
"exceptions not expected here");
2613 catch (
const std::exception& e)
2615 elog(
"Caught unexpected exception: ${e}", (
"e", e.what()));
2616 assert(
false &&
"exceptions not expected here");
2620 elog(
"Caught unexpected exception, could break sync operation");
2625 wlog(
"sync: received a list of sync items available from peer ${peer}, but I didn't ask for any!",
2636 catch (fc::key_not_found_exception&)
2642 catch (fc::key_not_found_exception&)
2654 wlog(
"Unexpected fetch_items_message from peer ${peer}, disconnecting",
2660 dlog(
"received items request for ids ${ids} of type ${type} from peer ${endpoint}",
2662 (
"type", fetch_items_message_received.
item_type)
2667 std::list<message> reply_messages;
2673 dlog(
"received item request for item ${id} from peer ${endpoint}, returning the item from my message cache",
2675 (
"id", requested_message.
id()));
2676 reply_messages.push_back(requested_message);
2678 last_block_message_sent = requested_message;
2681 catch (fc::key_not_found_exception&)
2690 dlog(
"received item request from peer ${endpoint}, returning the item from delegate with id ${id} size ${size}",
2691 (
"id", requested_message.
id())
2692 (
"size", requested_message.
size)
2694 reply_messages.push_back(requested_message);
2696 last_block_message_sent = requested_message;
2699 catch (fc::key_not_found_exception&)
2702 dlog(
"received item request from peer ${endpoint} but we don't have it",
2708 if (last_block_message_sent)
2715 for (
const message& reply : reply_messages)
2738 wlog(
"Peer ${peer} doesn't have the requested item ${item}.",
2740 (
"item", requested_item) );
2754 disconnect_from_peer(originating_peer,
"You are missing a sync item you claim to have, your database is probably corrupted. Try --rebuild-index.",
true,
2755 fc::exception(
FC_LOG_MESSAGE(error,
"You are missing a sync item you claim to have, your database is probably corrupted. Try --rebuild-index.",
2756 (
"item_id", requested_item))));
2757 wlog(
"Peer ${peer} doesn't have the requested sync item ${item}. This really shouldn't happen",
2759 (
"item", requested_item) );
2764 dlog(
"Peer doesn't have an item we're looking for, which is fine because we weren't looking for it");
2773 wlog(
"Unexpected item_ids_inventory_message from peer ${peer}, disconnecting",
2783 dlog(
"received inventory of ${count} items from peer ${endpoint}",
2788 item_id advertised_item_id(item_ids_inventory_message_received.
item_type, item_hash);
2789 bool we_advertised_this_item_to_a_peer =
false;
2790 bool we_requested_this_item_from_a_peer =
false;
2795 if (peer->inventory_advertised_to_peer.find(advertised_item_id) != peer->inventory_advertised_to_peer.end())
2797 we_advertised_this_item_to_a_peer =
true;
2800 if (peer->items_requested_from_peer.find(advertised_item_id) != peer->items_requested_from_peer.end())
2801 we_requested_this_item_from_a_peer =
true;
2806 if (!we_advertised_this_item_to_a_peer)
2816 if (!we_requested_this_item_from_a_peer)
2820 dlog(
"not adding ${item_hash} to our list of items to fetch because we've recently fetched a copy and it failed to push",
2821 (
"item_hash", item_hash));
2831 dlog(
"adding item ${item_hash} from inventory message to our list of items to fetch",
2832 (
"item_hash", item_hash));
2857 wlog(
"Peer ${peer} is disconnecting us because of an error: ${msg}, exception: ${error}",
2860 (
"error", closing_connection_message_received.
error ) );
2866 "Peer ${peer} is disconnecting us because of an error: ${msg}, exception: ${error}",
2869 (
"error", closing_connection_message_received.
error ) ));
2875 wlog(
"Peer ${peer} is disconnecting us because: ${msg}",
2893 && inbound_endpoint.
valid() && inbound_endpoint->
port() != 0 )
2897 if (updated_peer_record)
2911 update_address_seen_time(
this, originating_peer );
2914 ilog(
"Remote peer ${endpoint} closed their connection to us",
2953 dlog(
"in send_sync_block_to_node_delegate()");
2954 bool client_accepted_block =
false;
2955 bool discontinue_fetching_blocks_from_peer =
false;
2961 std::vector<message_hash_type> contained_transaction_msg_ids;
2962 _delegate->handle_block(block_message_to_send,
true, contained_transaction_msg_ids);
2963 dlog(
"Successfully pushed sync block ${num} (id:${id})",
2965 (
"id", block_message_to_send.
block_id));
2968 client_accepted_block =
true;
2970 catch (
const block_older_than_undo_history& e)
2972 wlog(
"Failed to push sync block ${num} (id:${id}): block is on a fork older than our undo history would "
2973 "allow us to switch to: ${e}",
2975 (
"id", block_message_to_send.
block_id)
2977 handle_message_exception = e;
2978 discontinue_fetching_blocks_from_peer =
true;
2980 catch (
const fc::canceled_exception&)
2987 wlog(
"Failed to push sync block ${num} (id:${id}): client rejected sync block sent by peer: ${e}",
2989 (
"id", block_message_to_send.
block_id)
2991 if( e.
code() == block_timestamp_in_future_exception::code_enum::code_value )
2993 handle_message_exception = block_timestamp_in_future_exception(
FC_LOG_MESSAGE( warn,
"",
2995 (
"block_num", block_num)
2996 (
"block_id", block_message_to_send.
block_id) ) );
2999 handle_message_exception = e;
3004 std::set<peer_connection_ptr> peers_with_newly_empty_item_lists;
3005 std::set<peer_connection_ptr> peers_we_need_to_sync_to;
3006 std::map<peer_connection_ptr, std::pair<std::string, fc::oexception> > peers_to_disconnect;
3008 if( client_accepted_block )
3011 dlog(
"sync: client accpted the block, we now have only ${count} items left to fetch before we're in sync",
3019 bool disconnecting_this_peer =
false;
3024 if (peer->last_known_fork_block_number != 0)
3027 if (next_fork_block_number != 0 &&
3028 next_fork_block_number <= block_message_to_send.
block.
block_num())
3030 std::ostringstream disconnect_reason_stream;
3031 disconnect_reason_stream <<
"You need to upgrade your client due to hard fork at block " << block_message_to_send.
block.
block_num();
3032 peers_to_disconnect[peer] = std::make_pair(disconnect_reason_stream.str(),
3035 #ifdef ENABLE_DEBUG_ULOGS
3036 ulog(
"Disconnecting from peer during sync because their version is too old. Their version date: ${date}", (
"date", peer->graphene_git_revision_unix_timestamp));
3038 disconnecting_this_peer =
true;
3042 if (!disconnecting_this_peer &&
3043 peer->ids_of_items_to_get.empty() && peer->ids_of_items_being_processed.empty())
3045 dlog(
"Cannot pop first element off peer ${peer}'s list, its list is empty", (
"peer", peer->get_remote_endpoint() ) );
3054 if (!peer->peer_needs_sync_items_from_us && !peer->we_need_sync_items_from_peer)
3056 dlog(
"We will be restarting synchronization with peer ${peer}", (
"peer", peer->get_remote_endpoint()));
3057 peers_we_need_to_sync_to.insert(peer);
3060 else if (!disconnecting_this_peer)
3062 auto items_being_processed_iter = peer->ids_of_items_being_processed.find(block_message_to_send.
block_id);
3063 if (items_being_processed_iter != peer->ids_of_items_being_processed.end())
3065 peer->last_block_delegate_has_seen = block_message_to_send.
block_id;
3066 peer->last_block_time_delegate_has_seen = block_message_to_send.
block.
timestamp;
3068 peer->ids_of_items_being_processed.erase(items_being_processed_iter);
3069 dlog(
"Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks",
3070 (
"endpoint", peer->get_remote_endpoint())(
"len", peer->ids_of_items_being_processed.size()));
3075 if (peer->ids_of_items_to_get.empty() &&
3076 peer->number_of_unfetched_item_ids == 0 &&
3077 peer->ids_of_items_being_processed.empty())
3078 peers_with_newly_empty_item_lists.insert(peer);
3093 if (peer->ids_of_items_being_processed.find(block_message_to_send.
block_id)
3094 != peer->ids_of_items_being_processed.end())
3096 if (discontinue_fetching_blocks_from_peer)
3098 wlog(
"inhibiting fetching sync blocks from peer ${endpoint} because it is on a fork that's too old",
3099 (
"endpoint", peer->get_remote_endpoint()));
3100 peer->inhibit_fetching_sync_blocks =
true;
3103 peers_to_disconnect[peer] = std::make_pair(
3104 std::string(
"You offered us a block that we reject as invalid"),
3110 for (
auto& peer_to_disconnect : peers_to_disconnect)
3113 std::string reason_string;
3115 std::tie(reason_string, reason_exception) = peer_to_disconnect.second;
3116 wlog(
"disconnecting client ${endpoint} because it offered us the rejected block",
3117 (
"endpoint", peer->get_remote_endpoint()));
3126 dlog(
"Leaving send_sync_block_to_node_delegate");
3133 "process_backlog_of_sync_blocks");
3143 if (calls_iter->ready())
3149 dlog(
"in process_backlog_of_sync_blocks");
3152 dlog(
"leaving process_backlog_of_sync_blocks because we're already processing too many blocks");
3160 dlog(
"resuming processing sync block backlog because we only ${count} blocks in progress",
3172 bool block_processed_this_iteration;
3173 size_t blocks_processed = 0;
3175 std::set<peer_connection_ptr> peers_with_newly_empty_item_lists;
3176 std::set<peer_connection_ptr> peers_we_need_to_sync_to;
3177 std::map<peer_connection_ptr, fc::oexception> peers_with_rejected_block;
3187 block_processed_this_iteration =
false;
3190 ++received_block_iter)
3194 bool potential_first_block =
false;
3199 if (!peer->ids_of_items_to_get.empty() &&
3200 peer->ids_of_items_to_get.front() == received_block_iter->block_id)
3202 potential_first_block =
true;
3203 peer->ids_of_items_to_get.pop_front();
3204 peer->ids_of_items_being_processed.insert(received_block_iter->block_id);
3210 if (potential_first_block)
3225 },
"send_sync_block_to_node_delegate"));
3227 block_processed_this_iteration =
true;
3231 dlog(
"Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted");
3232 std::vector< peer_connection_ptr > peers_needing_next_batch;
3236 auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id);
3237 if (items_being_processed_iter != peer->ids_of_items_being_processed.end())
3239 peer->ids_of_items_being_processed.erase(items_being_processed_iter);
3240 dlog(
"Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks",
3241 (
"endpoint", peer->get_remote_endpoint())(
"len", peer->ids_of_items_being_processed.size()));
3246 if (peer->ids_of_items_to_get.empty() &&
3247 peer->number_of_unfetched_item_ids == 0 &&
3248 peer->ids_of_items_being_processed.empty())
3250 dlog(
"We received last item in our list for peer ${endpoint}, setup to do a sync check", (
"endpoint", peer->get_remote_endpoint()));
3251 peers_needing_next_batch.push_back( peer );
3265 dlog(
"stopping processing sync block backlog because we have ${count} blocks in progress",
3273 }
while (block_processed_this_iteration);
3275 dlog(
"leaving process_backlog_of_sync_blocks, ${count} processed", (
"count", blocks_processed));
3286 "process_backlog_of_sync_blocks" );
3294 dlog(
"received a sync block from peer ${endpoint}", (
"endpoint", originating_peer->
get_remote_endpoint() ) );
3308 dlog(
"received a block from peer ${endpoint}, passing it to client",
3310 std::set<peer_connection_ptr> peers_to_disconnect;
3311 std::string disconnect_reason;
3326 std::vector<message_hash_type> contained_transaction_msg_ids;
3327 _delegate->handle_block(block_message_to_process,
false, contained_transaction_msg_ids);
3329 dlog(
"Successfully pushed block ${num} (id:${id})",
3331 (
"id", block_message_to_process.
block_id));
3334 bool new_transaction_discovered =
false;
3335 for (
const item_hash_t& transaction_message_hash : contained_transaction_msg_ids)
3350 if (new_transaction_discovered)
3354 dlog(
"Already received and accepted this block (presumably through sync mechanism), treating it as accepted" );
3356 dlog(
"client validated the block, advertising it to other peers" );
3359 uint32_t block_number = block_message_to_process.
block.
block_num();
3365 auto iter = peer->inventory_peer_advertised_to_us.find(block_message_item_id);
3366 if (iter != peer->inventory_peer_advertised_to_us.end())
3372 peer->last_block_delegate_has_seen = block_message_to_process.
block_id;
3373 peer->last_block_time_delegate_has_seen = block_time;
3375 peer->clear_old_inventory();
3380 broadcast( block_message_to_process, propagation_data );
3390 if (peer->last_known_fork_block_number != 0)
3393 if (next_fork_block_number != 0 &&
3394 next_fork_block_number <= block_number)
3396 peers_to_disconnect.insert(peer);
3397 #ifdef ENABLE_DEBUG_ULOGS
3398 ulog(
"Disconnecting from peer because their version is too old. Their version date: ${date}", (
"date", peer->graphene_git_revision_unix_timestamp));
3403 if (!peers_to_disconnect.empty())
3405 std::ostringstream disconnect_reason_stream;
3406 disconnect_reason_stream <<
"You need to upgrade your client due to hard fork at block " << block_number;
3407 disconnect_reason = disconnect_reason_stream.str();
3408 disconnect_exception =
fc::exception(
FC_LOG_MESSAGE(error,
"You need to upgrade your client due to hard fork at block ${block_number}",
3409 (
"block_number", block_number)));
3413 catch (
const fc::canceled_exception&)
3417 catch (
const unlinkable_block_exception& e)
3419 restart_sync_exception = e;
3425 wlog(
"Failed to push block ${num} (id:${id}), client rejected block sent by peer: ${e}",
3427 (
"id", block_message_to_process.
block_id)
3430 if( e.
code() == block_timestamp_in_future_exception::code_enum::code_value )
3432 disconnect_exception = block_timestamp_in_future_exception(
FC_LOG_MESSAGE( warn,
"",
3434 (
"block_num", block_num)
3435 (
"block_id", block_message_to_process.
block_id) ) );
3438 disconnect_exception = e;
3439 disconnect_reason =
"You offered me a block that I have deemed to be invalid";
3441 peers_to_disconnect.insert( originating_peer->shared_from_this() );
3444 if (!peer->ids_of_items_to_get.empty() && peer->ids_of_items_to_get.front() == block_message_to_process.
block_id)
3445 peers_to_disconnect.insert(peer);
3448 if (restart_sync_exception)
3450 wlog(
"Peer ${peer} sent me a block that didn't link to our blockchain. Restarting sync mode with them to get the missing block. "
3451 "Error pushing block was: ${e}",
3453 (
"e", *restart_sync_exception));
3459 wlog(
"disconnecting client ${endpoint} because it offered us the rejected block",
3460 (
"endpoint", peer->get_remote_endpoint()));
3465 const message& message_to_process,
3480 if (originating_peer->
idle())
3499 if (originating_peer->
idle())
3511 catch (
const fc::canceled_exception& e)
3517 elog(
"Caught unexpected exception: ${e}", (
"e", e));
3518 assert(
false &&
"exceptions not expected here");
3520 catch (
const std::exception& e)
3522 elog(
"Caught unexpected exception: ${e}", (
"e", e.what()));
3523 assert(
false &&
"exceptions not expected here");
3527 elog(
"Caught unexpected exception, could break sync operation");
3533 wlog(
"received a block ${block_id} I didn't ask for from peer ${endpoint}, disconnecting from peer",
3535 (
"block_id", block_message_to_process.
block_id));
3537 (
"block_id", block_message_to_process.
block_id)
3542 disconnect_from_peer(originating_peer,
"You sent me a block that I didn't ask for",
true, detailed_error);
3551 request_received_time);
3560 constexpr uint8_t two = 2;
3564 - reply_received_time) ).count() / two );
3577 const message& message_to_process,
3588 wlog(
"received a message I didn't ask for from peer ${endpoint}, disconnecting from peer",
3591 "You sent me a message that I didn't ask for, message_hash: ${message_hash}",
3592 (
"message_hash", message_hash ) ) );
3593 disconnect_from_peer( originating_peer,
"You sent me a message that I didn't request",
true, detailed_error );
3599 if (originating_peer->
idle())
3609 dlog(
"passing message containing transaction ${trx} to client",
3610 (
"trx", transaction_message_to_process.
trx.
id()) );
3611 _delegate->handle_transaction(transaction_message_to_process);
3614 _delegate->handle_message( message_to_process );
3617 catch (
const fc::canceled_exception& )
3626 case graphene::chain::duplicate_transaction::code_enum::code_value :
3627 case graphene::chain::limit_order_create_kill_unfilled::code_enum::code_value :
3628 case graphene::chain::limit_order_create_market_not_whitelisted::code_enum::code_value :
3629 case graphene::chain::limit_order_create_market_blacklisted::code_enum::code_value :
3630 case graphene::chain::limit_order_create_selling_asset_unauthorized::code_enum::code_value :
3631 case graphene::chain::limit_order_create_receiving_asset_unauthorized::code_enum::code_value :
3632 case graphene::chain::limit_order_create_insufficient_balance::code_enum::code_value :
3633 case graphene::chain::limit_order_update_nonexist_order::code_enum::code_value :
3634 case graphene::chain::limit_order_update_owner_mismatch::code_enum::code_value :
3635 case graphene::chain::limit_order_cancel_nonexist_order::code_enum::code_value :
3636 case graphene::chain::limit_order_cancel_owner_mismatch::code_enum::code_value :
3637 case graphene::chain::liquidity_pool_exchange_unfillable_price::code_enum::code_value :
3638 dlog(
"client rejected message sent by peer ${peer}, ${e}",
3643 wlog(
"client rejected message sent by peer ${peer}, ${e}",
3656 broadcast( message_to_process, propagation_data );
3663 peer->ids_of_items_to_get.clear();
3664 peer->number_of_unfetched_item_ids = 0;
3665 peer->we_need_sync_items_from_peer =
true;
3666 peer->last_block_delegate_has_seen =
item_hash_t();
3668 peer->inhibit_fetching_sync_blocks =
false;
3693 for(
const auto& potential_inbound_endpoint : peer->potential_inbound_endpoints )
3708 wlog(
"Exception thrown while closing P2P peer database, ignoring: ${e}", (
"e", e) );
3712 wlog(
"Exception thrown while closing P2P peer database, ignoring" );
3719 dlog(
"P2P TCP server closed");
3723 wlog(
"Exception thrown while closing P2P TCP server, ignoring: ${e}", (
"e", e) );
3727 wlog(
"Exception thrown while closing P2P TCP server, ignoring" );
3733 dlog(
"P2P accept loop terminated");
3737 wlog(
"Exception thrown while terminating P2P accept loop, ignoring: ${e}", (
"e", e) );
3741 wlog(
"Exception thrown while terminating P2P accept loop, ignoring" );
3751 dlog(
"P2P connect loop terminated");
3753 catch (
const fc::canceled_exception& )
3755 dlog(
"P2P connect loop terminated");
3759 wlog(
"Exception thrown while terminating P2P connect loop, ignoring: ${e}", (
"e", e) );
3763 wlog(
"Exception thrown while terminating P2P connect loop, ignoring" );
3769 dlog(
"Process backlog of sync items task terminated");
3771 catch (
const fc::canceled_exception& )
3773 dlog(
"Process backlog of sync items task terminated");
3777 wlog(
"Exception thrown while terminating Process backlog of sync items task, ignoring: ${e}", (
"e", e) );
3781 wlog(
"Exception thrown while terminating Process backlog of sync items task, ignoring" );
3784 size_t handle_message_call_count = 0;
3790 if( it->ready() || it->error() || it->canceled() )
3795 ++handle_message_call_count;
3798 it->cancel_and_wait(
"node_impl::close()");
3799 dlog(
"handle_message call #${count} task terminated", (
"count", handle_message_call_count));
3801 catch (
const fc::canceled_exception& )
3803 dlog(
"handle_message call #${count} task terminated", (
"count", handle_message_call_count));
3807 wlog(
"Exception thrown while terminating handle_message call #${count} task, ignoring: ${e}", (
"e", e)(
"count", handle_message_call_count));
3811 wlog(
"Exception thrown while terminating handle_message call #${count} task, ignoring",(
"count", handle_message_call_count));
3821 dlog(
"Fetch sync items loop terminated");
3823 catch (
const fc::canceled_exception& )
3825 dlog(
"Fetch sync items loop terminated");
3829 wlog(
"Exception thrown while terminating Fetch sync items loop, ignoring: ${e}", (
"e", e) );
3833 wlog(
"Exception thrown while terminating Fetch sync items loop, ignoring" );
3842 dlog(
"Fetch items loop terminated");
3844 catch (
const fc::canceled_exception& )
3846 dlog(
"Fetch items loop terminated");
3850 wlog(
"Exception thrown while terminating Fetch items loop, ignoring: ${e}", (
"e", e) );
3854 wlog(
"Exception thrown while terminating Fetch items loop, ignoring" );
3863 dlog(
"Advertise inventory loop terminated");
3865 catch (
const fc::canceled_exception& )
3867 dlog(
"Advertise inventory loop terminated");
3871 wlog(
"Exception thrown while terminating Advertise inventory loop, ignoring: ${e}", (
"e", e) );
3875 wlog(
"Exception thrown while terminating Advertise inventory loop, ignoring" );
3884 std::list<peer_connection_ptr> all_peers;
3885 auto p_back = [&all_peers](
const peer_connection_ptr& conn) { all_peers.push_back(conn); };
3903 peer->destroy_connection();
3907 wlog(
"Exception thrown while closing peer connection, ignoring: ${e}", (
"e", e) );
3911 wlog(
"Exception thrown while closing peer connection, ignoring" );
3922 #ifdef USE_PEERS_TO_DELETE_MUTEX
3928 dlog(
"Delayed peer deletion task terminated");
3932 wlog(
"Exception thrown while terminating Delayed peer deletion task, ignoring: ${e}", (
"e", e) );
3936 wlog(
"Exception thrown while terminating Delayed peer deletion task, ignoring" );
3947 dlog(
"Kill inactive connections loop terminated");
3951 wlog(
"Exception thrown while terminating Terminate inactive connections loop, ignoring: ${e}", (
"e", e) );
3955 wlog(
"Exception thrown while terminating Terminate inactive connections loop, ignoring" );
3961 dlog(
"Fetch updated peer lists loop terminated");
3965 wlog(
"Exception thrown while terminating Fetch updated peer lists loop, ignoring: ${e}", (
"e", e) );
3969 wlog(
"Exception thrown while terminating Fetch updated peer lists loop, ignoring" );
3975 dlog(
"Update seed nodes loop terminated");
3979 wlog(
"Exception thrown while terminating Update seed nodes loop, ignoring: ${e}", (
"e", e) );
3983 wlog(
"Exception thrown while terminating Update seed nodes loop, ignoring" );
3989 dlog(
"Bandwidth monitor loop terminated");
3993 wlog(
"Exception thrown while terminating Bandwidth monitor loop, ignoring: ${e}", (
"e", e) );
3997 wlog(
"Exception thrown while terminating Bandwidth monitor loop, ignoring" );
4003 dlog(
"Dump node status task terminated");
4007 wlog(
"Exception thrown while terminating Dump node status task, ignoring: ${e}", (
"e", e) );
4011 wlog(
"Exception thrown while terminating Dump node status task, ignoring" );
4018 new_peer->accept_connection();
4032 ilog(
"accepted inbound connection from ${remote_endpoint}",
4033 (
"remote_endpoint", new_peer->get_socket().remote_endpoint() ) );
4039 std::weak_ptr<peer_connection> new_weak_peer(new_peer);
4040 new_peer->accept_or_connect_task_done =
fc::async( [
this, new_weak_peer]() {
4046 },
"accept_connection_task" );
4060 fc::sha512 shared_secret = peer->get_shared_secret();
4061 shared_secret_encoder.
write(shared_secret.
data(),
sizeof(shared_secret));
4082 uint16_t inbound_port = 0;
4083 uint16_t outbound_port = 0;
4089 outbound_port = local_endpoint.
port();
4108 peer->send_message(
message(hello));
4126 ilog(
"Connecting to peer ${peer}", (
"peer", remote_endpoint));
4130 new_peer->connect_to( remote_endpoint, bind_to_endpoint );
4138 updated_peer_record.number_of_successful_connection_attempts++;
4144 connect_failed_exception = except;
4147 if( connect_failed_exception )
4152 updated_peer_record.number_of_failed_connection_attempts++;
4153 if (new_peer->connection_closed_error)
4154 updated_peer_record.last_error = *new_peer->connection_closed_error;
4156 updated_peer_record.last_error = *connect_failed_exception;
4173 throw *connect_failed_exception;
4179 new_peer->inbound_address = local_endpoint.
get_address();
4182 new_peer->outbound_port = local_endpoint.
port();
4187 ilog(
"Sent \"hello\" to peer ${peer}", (
"peer", new_peer->get_remote_endpoint()));
4197 _delegate = std::make_unique<statistics_gathering_node_delegate_wrapper>(del, thread_for_delegate_calls);
4207 bool node_configuration_loaded =
false;
4213 ilog(
"Loaded configuration from file ${filename}", (
"filename", configuration_file_name ) );
4218 node_configuration_loaded =
true;
4220 catch ( fc::parse_error_exception& parse_error )
4222 elog(
"malformed node configuration file ${filename}: ${error}",
4223 (
"filename", configuration_file_name )(
"error", parse_error.to_detail_string() ) );
4227 elog(
"unexpected exception while reading configuration file ${filename}: ${error}",
4228 (
"filename", configuration_file_name )(
"error", except.
to_detail_string() ) );
4232 if( !node_configuration_loaded )
4236 #ifdef GRAPHENE_TEST_NETWORK
4243 ilog(
"generating new private key for this node" );
4274 elog(
"unable to open peer database ${filename}: ${error}",
4275 (
"filename", potential_peer_database_file_name)(
"error", except.
to_detail_string()));
4285 wlog(
"accept_incoming_connections is false, p2p network will not accept any incoming connections");
4292 if( listen_endpoint.
port() != 0 )
4309 bool listen_failed =
false;
4315 temporary_server.
listen( listen_endpoint );
4317 temporary_server.
listen( listen_endpoint.
port() );
4322 listen_failed =
true;
4329 std::ostringstream error_message_stream;
4332 error_message_stream <<
"Unable to listen for connections on port " << listen_endpoint.
port()
4333 <<
", retrying in a few seconds\n";
4334 error_message_stream <<
"You can wait for it to become available, or restart this program using\n";
4335 error_message_stream <<
"the --p2p-endpoint option to specify another port\n";
4340 error_message_stream <<
"\nStill waiting for port " << listen_endpoint.
port() <<
" to become available\n";
4342 std::string error_message = error_message_stream.str();
4343 wlog(error_message);
4344 std::cout <<
"\033[31m" << error_message;
4350 wlog(
"unable to bind on the requested endpoint ${endpoint}, which probably means that endpoint is already in use",
4351 (
"endpoint", listen_endpoint ) );
4371 ilog(
"listening for connections on endpoint ${endpoint} (our first choice)",
4376 FC_RETHROW_EXCEPTION( e, error,
"unable to listen on ${endpoint}", (
"endpoint",listen_endpoint ) );
4399 "p2p_network_connect_loop" );
4403 "advertise_inventory_loop" );
4405 "kill_inactive_conns_loop" );
4407 "fetch_updated_peer_lists_loop");
4427 auto delay_until_retry =
fc::seconds( (updated_peer_record.number_of_failed_connection_attempts + 1)
4429 updated_peer_record.last_connection_attempt_time
4430 = std::min<fc::time_point_sec>( updated_peer_record.last_connection_attempt_time,
4453 static std::vector<fc::ip::endpoint> resolve_string_to_ip_endpoints(
const std::string& in)
4457 std::string::size_type colon_pos = in.find(
':');
4458 if (colon_pos == std::string::npos)
4459 FC_THROW(
"Missing required port number in endpoint string \"${endpoint_string}\"",
4460 (
"endpoint_string", in));
4461 std::string port_string = in.substr(colon_pos + 1);
4464 uint16_t port = boost::lexical_cast<uint16_t>(port_string);
4466 std::string hostname = in.substr(0, colon_pos);
4467 std::vector<fc::ip::endpoint> endpoints =
fc::resolve(hostname, port);
4468 if (endpoints.empty())
4470 "The host name can not be resolved: ${hostname}",
4471 (
"hostname", hostname) );
4474 catch (
const boost::bad_lexical_cast&)
4476 FC_THROW(
"Bad port: ${port}", (
"port", port_string));
4485 std::vector<fc::ip::endpoint> endpoints;
4486 ilog(
"Resolving seed node ${endpoint}", (
"endpoint", endpoint_string));
4489 endpoints = resolve_string_to_ip_endpoints(endpoint_string);
4493 wlog(
"Unable to resolve endpoint during attempt to add seed node ${ep}", (
"ep", endpoint_string) );
4497 ilog(
"Adding seed node ${endpoint}", (
"endpoint",
endpoint));
4504 new_peer->get_socket().open();
4505 new_peer->get_socket().set_reuse_address();
4513 std::weak_ptr<peer_connection> new_weak_peer(new_peer);
4514 new_peer->accept_or_connect_task_done =
fc::async([
this, new_weak_peer](){
4520 },
"connect_to_task");
4527 FC_THROW_EXCEPTION(already_connected_to_requested_peer,
"already connected to requested endpoint ${endpoint}",
4528 (
"endpoint", remote_endpoint));
4530 dlog(
"node_impl::connect_to_endpoint(${endpoint})", (
"endpoint", remote_endpoint));
4532 new_peer->set_remote_endpoint(remote_endpoint);
4551 && endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint )
4555 if( active_peer->additional_inbound_endpoints.find( remote_endpoint )
4556 != active_peer->additional_inbound_endpoints.end() )
4575 if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint )
4576 return handshaking_peer;
4579 if( handshaking_peer->additional_inbound_endpoints.find( remote_endpoint )
4580 != handshaking_peer->additional_inbound_endpoints.end() )
4581 return handshaking_peer;
4622 ilog(
"----------------- PEER STATUS UPDATE --------------------" );
4623 ilog(
" number of peers: ${active} active, ${handshaking} handshaking, ${closing} closing. "
4624 " attempting to maintain ${desired} - ${maximum} peers",
4632 ilog(
" active peer ${endpoint} [${direction}] (${inbound_ep} ${is_firewalled}) "
4633 "peer_is_in_sync_with_us:${in_sync_with_us} we_are_in_sync_with_peer:${in_sync_with_them}",
4634 (
"endpoint", peer->get_remote_endpoint() )
4635 (
"direction", peer->direction )
4636 (
"inbound_ep", peer->get_endpoint_for_connecting() )
4637 (
"is_firewalled", peer->is_firewalled)
4638 (
"in_sync_with_us", !peer->peer_needs_sync_items_from_us )
4639 (
"in_sync_with_them", !peer->we_need_sync_items_from_peer ) );
4640 if( peer->we_need_sync_items_from_peer )
4641 ilog(
" above peer has ${count} sync items we might need",
4642 (
"count", peer->ids_of_items_to_get.size() ) );
4643 if (peer->inhibit_fetching_sync_blocks)
4644 ilog(
" we are not fetching sync blocks from the above peer "
4645 "(inhibit_fetching_sync_blocks == true)" );
4653 ilog(
" handshaking peer ${endpoint} [${direction}] in state ours(${our_state}) theirs(${their_state})",
4654 (
"endpoint", peer->get_remote_endpoint() )
4655 (
"direction", peer->direction )
4656 (
"our_state", peer->our_state )(
"their_state", peer->their_state ) );
4659 ilog(
"--------- MEMORY USAGE ------------" );
4669 ilog(
" peer ${endpoint}", (
"endpoint", peer->get_remote_endpoint() ) );
4670 ilog(
" peer.ids_of_items_to_get size: ${size}", (
"size", peer->ids_of_items_to_get.size() ) );
4671 ilog(
" peer.inventory_peer_advertised_to_us size: ${size}", (
"size", peer->inventory_peer_advertised_to_us.size() ) );
4672 ilog(
" peer.inventory_advertised_to_peer size: ${size}", (
"size", peer->inventory_advertised_to_peer.size() ) );
4673 ilog(
" peer.items_requested_from_peer size: ${size}", (
"size", peer->items_requested_from_peer.size() ) );
4674 ilog(
" peer.sync_items_requested_from_peer size: ${size}", (
"size", peer->sync_items_requested_from_peer.size() ) );
4676 ilog(
"--------- END MEMORY USAGE ------------" );
4680 const std::string& reason_for_disconnect,
4681 bool caused_by_error ,
4694 dlog(
"Disconnecting again from ${peer} for ${reason}, ignore",
4702 if( inbound_endpoint.
valid() && inbound_endpoint->
port() != 0 )
4706 if (updated_peer_record)
4710 updated_peer_record->last_error = error;
4726 if (caused_by_error)
4728 std::ostringstream error_message;
4730 " for reason: " << reason_for_disconnect;
4732 dlog(error_message.str());
4735 dlog(
"Disconnecting from ${peer} for ${reason}", (
"peer",peer_to_disconnect->
get_remote_endpoint()) (
"reason",reason_for_disconnect));
4776 std::vector<peer_status> statuses;
4787 peer_details[
"addrlocal"] = (std::string)peer->get_local_endpoint();
4788 peer_details[
"services"] =
"00000001";
4789 peer_details[
"lastsend"] = peer->get_last_message_sent_time().sec_since_epoch();
4790 peer_details[
"lastrecv"] = peer->get_last_message_received_time().sec_since_epoch();
4791 peer_details[
"bytessent"] = peer->get_total_bytes_sent();
4792 peer_details[
"bytesrecv"] = peer->get_total_bytes_received();
4793 peer_details[
"conntime"] = peer->get_connection_time();
4794 peer_details[
"pingtime"] =
"";
4795 peer_details[
"pingwait"] =
"";
4796 peer_details[
"version"] =
"";
4797 peer_details[
"subver"] = peer->user_agent;
4799 peer_details[
"firewall_status"] =
fc::variant( peer->is_firewalled, 1 );
4800 peer_details[
"startingheight"] =
"";
4801 peer_details[
"banscore"] =
"";
4802 peer_details[
"syncnode"] =
"";
4804 if (peer->fc_git_revision_sha)
4806 std::string revision_string = *peer->fc_git_revision_sha;
4808 revision_string +=
" (same as ours)";
4810 revision_string +=
" (different from ours)";
4811 peer_details[
"fc_git_revision_sha"] = revision_string;
4814 if (peer->fc_git_revision_unix_timestamp)
4816 peer_details[
"fc_git_revision_unix_timestamp"] = *peer->fc_git_revision_unix_timestamp;
4819 age_string +=
" (same as ours)";
4821 age_string +=
" (newer than ours)";
4823 age_string +=
" (older than ours)";
4824 peer_details[
"fc_git_revision_age"] = age_string;
4828 peer_details[
"platform"] = *peer->platform;
4833 peer_details[
"current_head_block"] =
fc::variant( peer->last_block_delegate_has_seen, 1 );
4834 peer_details[
"current_head_block_number"] =
_delegate->get_block_number(peer->last_block_delegate_has_seen);
4835 peer_details[
"current_head_block_time"] = peer->last_block_time_delegate_has_seen;
4837 peer_details[
"peer_needs_sync_items_from_us"] = peer->peer_needs_sync_items_from_us;
4838 peer_details[
"we_need_sync_items_from_peer"] = peer->we_need_sync_items_from_peer;
4840 this_peer_status.
info = peer_details;
4841 statuses.push_back(this_peer_status);
4859 hash_of_message_contents = block_message_to_broadcast.
block_id;
4865 hash_of_message_contents = transaction_message_to_broadcast.
trx.
id();
4866 dlog(
"broadcasting trx: ${trx}", (
"trx", transaction_message_to_broadcast) );
4880 broadcast( item_to_broadcast, propagation_data );
4901 std::vector<potential_peer_record> result;
4904 result.push_back(*itr);
4911 if (params.
contains(
"peer_connection_retry_timeout"))
4913 if (params.
contains(
"desired_number_of_connections"))
4915 if (params.
contains(
"maximum_number_of_connections"))
4917 if (params.
contains(
"max_addresses_to_handle_at_once"))
4919 if (params.
contains(
"max_blocks_to_handle_at_once"))
4921 if (params.
contains(
"max_sync_blocks_to_prefetch"))
4923 if (params.
contains(
"max_sync_blocks_per_peer"))
4930 "I have too many connections open");
4970 #ifdef ENABLE_P2P_DEBUGGING_API
4971 _allowed_peers.clear();
4972 _allowed_peers.insert(allowed_peers.begin(), allowed_peers.end());
4973 std::list<peer_connection_ptr> peers_to_disconnect;
4974 if (!_allowed_peers.empty())
4978 if (_allowed_peers.find(peer->node_id) == _allowed_peers.end())
4979 peers_to_disconnect.push_back(peer);
4982 disconnect_from_peer(peer.get(),
"My allowed_peers list has changed, and you're no longer allowed. Bye.");
4983 #endif // ENABLE_P2P_DEBUGGING_API
5001 return _delegate->get_call_statistics();
5016 std::vector<uint32_t> network_usage_by_second;
5020 std::back_inserter(network_usage_by_second),
5021 std::plus<uint32_t>());
5023 std::vector<uint32_t> network_usage_by_minute;
5027 std::back_inserter(network_usage_by_minute),
5028 std::plus<uint32_t>());
5030 std::vector<uint32_t> network_usage_by_hour;
5034 std::back_inserter(network_usage_by_hour),
5035 std::plus<uint32_t>());
5038 result[
"usage_by_second"] =
fc::variant( network_usage_by_second, 2 );
5039 result[
"usage_by_minute"] =
fc::variant( network_usage_by_minute, 2 );
5040 result[
"usage_by_hour"] =
fc::variant( network_usage_by_hour, 2 );
5062 #ifdef P2P_IN_DEDICATED_THREAD
5063 # define INVOKE_IN_IMPL(method_name, ...) \
5064 return my->_thread->async([&](){ return my->method_name(__VA_ARGS__); }, "thread invoke for method " BOOST_PP_STRINGIZE(method_name)).wait()
5066 # define INVOKE_IN_IMPL(method_name, ...) \
5067 return my->method_name(__VA_ARGS__)
5068 #endif // P2P_IN_DEDICATED_THREAD
5071 my(new detail::node_impl(user_agent), detail::node_impl_deleter())
5204 uint32_t download_bytes_per_second)
const
5231 #define ROLLING_WINDOW_SIZE 1000
5232 #define INITIALIZE_ACCUMULATOR(r, data, method_name) \
5233 , BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _execution_accumulator))(boost::accumulators::tag::rolling_window::window_size = ROLLING_WINDOW_SIZE) \
5234 , BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _delay_before_accumulator))(boost::accumulators::tag::rolling_window::window_size = ROLLING_WINDOW_SIZE) \
5235 , BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _delay_after_accumulator))(boost::accumulators::tag::rolling_window::window_size = ROLLING_WINDOW_SIZE)
5239 std::shared_ptr<node_delegate> delegate,
fc::thread* thread_for_delegate_calls) :
5240 _node_delegate(delegate),
5241 _thread(thread_for_delegate_calls)
5244 #undef INITIALIZE_ACCUMULATOR
5249 std::ostringstream note;
5250 note <<
"All times are in microseconds, mean is the average of the last " <<
ROLLING_WINDOW_SIZE <<
" call times";
5251 statistics[
"_note"] = note.str();
5253 #define ADD_STATISTICS_FOR_METHOD(r, data, method_name) \
5254 fc::mutable_variant_object BOOST_PP_CAT(method_name, _stats); \
5255 BOOST_PP_CAT(method_name, _stats)["min"] = boost::accumulators::min(BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _execution_accumulator))); \
5256 BOOST_PP_CAT(method_name, _stats)["mean"] = boost::accumulators::rolling_mean(BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _execution_accumulator))); \
5257 BOOST_PP_CAT(method_name, _stats)["max"] = boost::accumulators::max(BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _execution_accumulator))); \
5258 BOOST_PP_CAT(method_name, _stats)["sum"] = boost::accumulators::sum(BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _execution_accumulator))); \
5259 BOOST_PP_CAT(method_name, _stats)["delay_before_min"] = boost::accumulators::min(BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _delay_before_accumulator))); \
5260 BOOST_PP_CAT(method_name, _stats)["delay_before_mean"] = boost::accumulators::rolling_mean(BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _delay_before_accumulator))); \
5261 BOOST_PP_CAT(method_name, _stats)["delay_before_max"] = boost::accumulators::max(BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _delay_before_accumulator))); \
5262 BOOST_PP_CAT(method_name, _stats)["delay_before_sum"] = boost::accumulators::sum(BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _delay_before_accumulator))); \
5263 BOOST_PP_CAT(method_name, _stats)["delay_after_min"] = boost::accumulators::min(BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _delay_after_accumulator))); \
5264 BOOST_PP_CAT(method_name, _stats)["delay_after_mean"] = boost::accumulators::rolling_mean(BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _delay_after_accumulator))); \
5265 BOOST_PP_CAT(method_name, _stats)["delay_after_max"] = boost::accumulators::max(BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _delay_after_accumulator))); \
5266 BOOST_PP_CAT(method_name, _stats)["delay_after_sum"] = boost::accumulators::sum(BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _delay_after_accumulator))); \
5267 BOOST_PP_CAT(method_name, _stats)["count"] = boost::accumulators::count(BOOST_PP_CAT(_, BOOST_PP_CAT(method_name, _execution_accumulator))); \
5268 statistics[BOOST_PP_STRINGIZE(method_name)] = BOOST_PP_CAT(method_name, _stats);
5271 #undef ADD_STATISTICS_FOR_METHOD
5278 #ifdef VERBOSE_NODE_DELEGATE_LOGGING
5279 # define INVOKE_AND_COLLECT_STATISTICS(method_name, ...) \
5282 std::shared_ptr<call_statistics_collector> statistics_collector = std::make_shared<call_statistics_collector>( \
5284 &_ ## method_name ## _execution_accumulator, \
5285 &_ ## method_name ## _delay_before_accumulator, \
5286 &_ ## method_name ## _delay_after_accumulator); \
5287 if (_thread->is_current()) \
5289 call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \
5290 return _node_delegate->method_name(__VA_ARGS__); \
5293 return _thread->async([&, statistics_collector](){ \
5294 call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \
5295 return _node_delegate->method_name(__VA_ARGS__); \
5296 }, "invoke " BOOST_STRINGIZE(method_name)).wait(); \
5298 catch (const fc::exception& e) \
5300 dlog("node_delegate threw fc::exception: ${e}", ("e", e)); \
5303 catch (const std::exception& e) \
5305 dlog("node_delegate threw std::exception: ${e}", ("e", e.what())); \
5310 dlog("node_delegate threw unrecognized exception"); \
5314 # define INVOKE_AND_COLLECT_STATISTICS(method_name, ...) \
5315 std::shared_ptr<call_statistics_collector> statistics_collector = std::make_shared<call_statistics_collector>( \
5317 &_ ## method_name ## _execution_accumulator, \
5318 &_ ## method_name ## _delay_before_accumulator, \
5319 &_ ## method_name ## _delay_after_accumulator); \
5320 if (_thread->is_current()) \
5322 call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \
5323 return _node_delegate->method_name(__VA_ARGS__); \
5326 return _thread->async([&, statistics_collector](){ \
5327 call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \
5328 return _node_delegate->method_name(__VA_ARGS__); \
5329 }, "invoke " BOOST_STRINGIZE(method_name)).wait()
5343 bool sync_mode, std::vector<message_hash_type>& contained_transaction_msg_ids)
5354 uint32_t& remaining_item_count,
5389 return _node_delegate->get_block_number(block_id);
5417 #undef INVOKE_AND_COLLECT_STATISTICS
5423 for(
const std::string& endpoint_string : seeds )
5428 wlog(
"caught exception ${e} while adding seed node ${endpoint}",
5440 const std::vector<std::string>& advertise_or_exclude_list )
const