BitShares-Core  7.0.2
BitShares blockchain node software and command-line wallet software
node.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Cryptonomex, Inc., and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 #include <sstream>
25 #include <iomanip>
26 #include <deque>
27 #include <unordered_set>
28 #include <list>
29 #include <forward_list>
30 #include <iostream>
31 #include <algorithm>
32 #include <tuple>
33 #include <string>
34 #include <boost/tuple/tuple.hpp>
35 #include <boost/circular_buffer.hpp>
36 
37 #include <boost/multi_index_container.hpp>
38 #include <boost/multi_index/ordered_index.hpp>
39 #include <boost/multi_index/mem_fun.hpp>
40 #include <boost/multi_index/member.hpp>
41 #include <boost/multi_index/random_access_index.hpp>
42 #include <boost/multi_index/tag.hpp>
43 #include <boost/multi_index/sequenced_index.hpp>
44 #include <boost/multi_index/hashed_index.hpp>
45 #include <boost/logic/tribool.hpp>
46 #include <boost/range/algorithm_ext/push_back.hpp>
47 #include <boost/range/algorithm/find.hpp>
48 #include <boost/range/numeric.hpp>
49 
50 #include <boost/preprocessor/seq/for_each.hpp>
51 #include <boost/preprocessor/cat.hpp>
52 #include <boost/preprocessor/stringize.hpp>
53 
54 #include <fc/thread/thread.hpp>
55 #include <fc/thread/future.hpp>
57 #include <fc/thread/mutex.hpp>
59 #include <fc/log/logger.hpp>
60 #include <fc/io/json.hpp>
61 #include <fc/io/enum_type.hpp>
62 #include <fc/io/raw.hpp>
63 #include <fc/crypto/rand.hpp>
64 #include <fc/network/ip.hpp>
65 #include <fc/network/resolve.hpp>
66 
67 #include <graphene/net/node.hpp>
71 #include <graphene/net/config.hpp>
73 
76 // Nasty hack: A circular dependency around fee_schedule is resolved by fwd-declaring it and using a shared_ptr
77 // to it in chain_parameters, which is used in an operation and thus must be serialized by the net library.
78 // Resolving that forward declaration doesn't happen until now:
80 
81 #include <fc/git_revision.hpp>
82 
83 #include "node_impl.hxx"
84 
85 namespace graphene { namespace net { namespace detail {
86 
88  {
89  ++block_clock;
90  if( block_clock > cache_duration_in_blocks )
91  _message_cache.get<block_clock_index>().erase(_message_cache.get<block_clock_index>().begin(),
92  _message_cache.get<block_clock_index>().lower_bound(block_clock - cache_duration_in_blocks ) );
93  }
94 
96  const message_hash_type& hash_of_message_to_cache,
97  const message_propagation_data& propagation_data,
98  const message_hash_type& message_content_hash )
99  {
100  _message_cache.insert( message_info(hash_of_message_to_cache,
101  message_to_cache,
102  block_clock,
103  propagation_data,
104  message_content_hash ) );
105  }
106 
107  message blockchain_tied_message_cache::get_message( const message_hash_type& hash_of_message_to_lookup ) const
108  {
109  message_cache_container::index<message_hash_index>::type::const_iterator iter =
110  _message_cache.get<message_hash_index>().find(hash_of_message_to_lookup );
111  if( iter != _message_cache.get<message_hash_index>().end() )
112  return iter->message_body;
113  FC_THROW_EXCEPTION( fc::key_not_found_exception, "Requested message not in cache" );
114  }
115 
117  const message_hash_type& hash_of_msg_contents_to_lookup ) const
118  {
119  if( hash_of_msg_contents_to_lookup != message_hash_type() )
120  {
121  message_cache_container::index<message_contents_hash_index>::type::const_iterator iter =
122  _message_cache.get<message_contents_hash_index>().find(hash_of_msg_contents_to_lookup );
123  if( iter != _message_cache.get<message_contents_hash_index>().end() )
124  return iter->propagation_data;
125  }
126  FC_THROW_EXCEPTION( fc::key_not_found_exception, "Requested message not in cache" );
127  }
128 
130  {
131 #ifdef P2P_IN_DEDICATED_THREAD
132  std::weak_ptr<fc::thread> weak_thread;
133  if (impl_to_delete)
134  {
135  std::shared_ptr<fc::thread> impl_thread(impl_to_delete->_thread);
136  weak_thread = impl_thread;
137  impl_thread->async([impl_to_delete](){ delete impl_to_delete; }, "delete node_impl").wait();
138  dlog("deleting the p2p thread");
139  }
140  if (weak_thread.expired())
141  dlog("done deleting the p2p thread");
142  else
143  dlog("failed to delete the p2p thread, we must be leaking a smart pointer somewhere");
144 #else // P2P_IN_DEDICATED_THREAD
145  delete impl_to_delete;
146 #endif // P2P_IN_DEDICATED_THREAD
147  }
148 
149 #ifdef P2P_IN_DEDICATED_THREAD
150 # define VERIFY_CORRECT_THREAD() assert(_thread->is_current())
151 #else
152 # define VERIFY_CORRECT_THREAD() do {} while (0)
153 #endif
154 
156  static void greatly_delay_next_conn_to( node_impl* impl, const fc::ip::endpoint& ep )
157  {
158  dlog( "Greatly delaying the next connection to endpoint ${ep}", ("ep", ep) );
159  fc::optional<potential_peer_record> updated_peer_record
160  = impl->_potential_peer_db.lookup_entry_for_endpoint( ep );
161  if( updated_peer_record )
162  {
163  updated_peer_record->last_connection_disposition = last_connection_rejected;
164  updated_peer_record->last_connection_attempt_time = fc::time_point::now();
165  constexpr uint32_t failed_attempts_to_add = 120; // * 30 seconds = 1 hour
166  updated_peer_record->number_of_failed_connection_attempts += failed_attempts_to_add;
167  impl->_potential_peer_db.update_entry( *updated_peer_record );
168  }
169  }
171  static void save_successful_address( node_impl* impl, const fc::ip::endpoint& ep )
172  {
173  dlog( "Saving successfully connected endpoint ${ep} to peer database", ("ep", ep) );
174  auto updated_peer_record = impl->_potential_peer_db.lookup_or_create_entry_for_ep( ep );
175  updated_peer_record.last_connection_disposition = last_connection_succeeded;
176  updated_peer_record.last_connection_attempt_time = fc::time_point::now();
177  // halve number_of_failed_connection_attempts
178  constexpr uint16_t two = 2;
179  updated_peer_record.number_of_failed_connection_attempts /= two;
180  updated_peer_record.last_seen_time = fc::time_point::now();
181  impl->_potential_peer_db.update_entry(updated_peer_record);
182  }
183  static void update_address_seen_time( node_impl* impl, const peer_connection* active_peer )
184  {
185  fc::optional<fc::ip::endpoint> inbound_endpoint = active_peer->get_endpoint_for_connecting();
186  if( inbound_endpoint.valid() && inbound_endpoint->port() != 0 )
187  {
188  fc::optional<potential_peer_record> updated_peer_record
189  = impl->_potential_peer_db.lookup_entry_for_endpoint( *inbound_endpoint );
190  if( updated_peer_record )
191  {
192  updated_peer_record->last_seen_time = fc::time_point::now();
193  impl->_potential_peer_db.update_entry( *updated_peer_record );
194  }
195  }
196  }
197  static void update_address_seen_time( node_impl* impl, const peer_connection_ptr& active_peer )
198  {
199  update_address_seen_time( impl, active_peer.get() );
200  }
201 
204  {
205  public:
206  fc::flat_set<fc::ip::endpoint> list;
207 
208  explicit generic_list_address_builder(const std::vector<std::string>& address_list)
209  {
210  FC_ASSERT( !address_list.empty(), "The peer node list must not be empty" );
211 
212  std::for_each( address_list.begin(), address_list.end(), [&list = list]( const std::string& str )
213  {
214  // ignore fc exceptions (like poorly formatted endpoints)
215  try
216  {
217  list.insert( fc::ip::endpoint::from_string(str) );
218  }
219  catch(const fc::exception& )
220  {
221  wlog( "Address ${addr} invalid.", ("addr", str) );
222  }
223  } );
224  }
225  };
226 
227  /******
228  * Use information passed from command line or config file to advertise nodes
229  */
231  {
232  public:
233  using generic_list_address_builder::generic_list_address_builder;
234 
235  bool should_advertise( const fc::ip::endpoint& in ) const override
236  {
237  return !( list.find(in) == list.end() );
238  }
239  };
240 
241  /****
242  * Advertise all nodes except a predefined list
243  */
245  {
246  public:
247  using generic_list_address_builder::generic_list_address_builder;
248 
249  bool should_advertise( const fc::ip::endpoint& in ) const override
250  {
251  return ( list.find( in ) == list.end() );
252  }
253  };
254 
255  /***
256  * Return all peers when node asks
257  */
259  {
260  bool should_advertise( const fc::ip::endpoint& in ) const override
261  {
262  return true;
263  }
264  };
265 
266  std::shared_ptr<node_impl::address_builder> node_impl::address_builder::create_default_address_builder()
267  {
268  return std::make_shared<all_address_builder>();
269  }
270 
271  void node_impl::address_builder::build(node_impl* impl, address_message& reply) const
272  {
273  reply.addresses.clear();
274  reply.addresses.reserve( impl->_active_connections.size() );
277  // only pass those that are allowed to advertise AND we are connected to
278  for( const peer_connection_ptr& active_peer : impl->_active_connections )
279  {
280  // Note:
281  // * We want to advertise the peer's inbound endpoint, but not necessarily the remote endpoint.
282  // * If the peer's inbound port is 0, we still advertise it so that observers know about it.
283  // The peer is marked as "firewalled", so peers running older versions should be able to handle it too.
284  //
285  // If it is an outbound connection, we know that the remote endpoint works (at least for us),
286  // and we have assigned it to the inbound endpoint, so just use either of them.
287  // If it is an inbound connection, we just advertise what we have.
288  fc::optional<fc::ip::endpoint> inbound_endpoint = active_peer->get_endpoint_for_connecting();
289  if( inbound_endpoint.valid() && should_advertise( *inbound_endpoint ) )
290  {
291  update_address_seen_time( impl, active_peer );
292  reply.addresses.emplace_back( *inbound_endpoint,
293  now,
294  active_peer->round_trip_delay,
295  active_peer->node_id,
296  active_peer->direction,
297  active_peer->is_firewalled );
298  }
299  }
300  }
301 
302  node_impl::node_impl(const std::string& user_agent) :
303  _user_agent_string(user_agent)
304  {
306  // Note: this means that the node gets a new node_id every time it restarts
307  fc::rand_bytes((char*) _node_id.data(), (int)_node_id.size());
308  }
309 
311  {
313  ilog( "cleaning up node" );
314  _node_is_shutting_down = true;
315 
316  {
318  for (const peer_connection_ptr& active_peer : _active_connections)
319  {
320  update_address_seen_time( this, active_peer );
321  }
322  }
323 
324  try
325  {
326  ilog( "close" );
327  close();
328  }
329  catch ( const fc::exception& e )
330  {
331  wlog( "unexpected exception on close ${e}", ("e", e) );
332  }
333  ilog( "done" );
334  }
335 
337  {
339 
341  try
342  {
345  fc::json::save_to_file( _node_configuration, configuration_file_name );
346  dlog( "Saved node configuration to file ${filename}", ( "filename", configuration_file_name ) );
347  }
348  catch (const fc::canceled_exception&)
349  {
350  throw;
351  }
352  catch ( const fc::exception& except )
353  {
354  wlog( "error writing node configuration to file ${filename}: ${error}",
355  ( "filename", configuration_file_name )("error", except.to_detail_string() ) );
356  }
357  }
358 
360  {
363  {
364  try
365  {
366  dlog("Starting an iteration of p2p_network_connect_loop().");
368 
369  // add-once peers bypass our checks on the maximum/desired number of connections
370  // (but they will still be counted against the totals once they're connected)
371  if (!_add_once_node_list.empty())
372  {
373  std::list<potential_peer_record> add_once_node_list;
374  add_once_node_list.swap(_add_once_node_list);
375  dlog("Processing \"add once\" node list containing ${count} peers:",
376  ("count", add_once_node_list.size()));
377  for (const potential_peer_record& add_once_peer : add_once_node_list)
378  {
379  dlog(" ${peer}", ("peer", add_once_peer.endpoint));
380  }
381  for (const potential_peer_record& add_once_peer : add_once_node_list)
382  {
383  // If we have an existing connection to that peer, skip it.
384  peer_connection_ptr existing_connection_ptr = get_connection_for_endpoint( add_once_peer.endpoint );
385  if(!existing_connection_ptr)
386  connect_to_endpoint(add_once_peer.endpoint);
387  }
388  dlog("Done processing \"add once\" node list");
389  }
390 
392  {
393  bool initiated_connection_this_pass = false;
395 
398  ++iter)
399  {
400  fc::microseconds delay_until_retry = fc::seconds( (iter->number_of_failed_connection_attempts + 1)
402 
403  bool last_connection_not_ok = ( iter->last_connection_disposition == last_connection_failed ||
404  iter->last_connection_disposition == last_connection_rejected ||
405  iter->last_connection_disposition == last_connection_handshaking_failed );
406 
407  if( !is_connected_to_endpoint( iter->endpoint )
408  && ( !last_connection_not_ok
409  || ( fc::time_point::now() - iter->last_connection_attempt_time ) > delay_until_retry ) )
410  {
411  connect_to_endpoint(iter->endpoint);
412  initiated_connection_this_pass = true;
413  }
414  }
415 
416  if (!initiated_connection_this_pass && !_potential_peer_db_updated)
417  break;
418  }
419 
421 
422  // if we broke out of the while loop, that means either we have connected to enough nodes, or
423  // we don't have any good candidates to connect to right now.
424 #if 0
425  try
426  {
427  _retrigger_connect_loop_promise = fc::promise<void>::create("graphene::net::retrigger_connect_loop");
429  {
431  dlog( "Still want to connect to more nodes, but I don't have any good candidates. Trying again in 15 seconds" );
432  else
433  dlog( "I still have some \"add once\" nodes to connect to. Trying again in 15 seconds" );
435  }
436  else
437  {
438  dlog( "I don't need any more connections, waiting forever until something changes" );
440  }
441  }
442  catch ( fc::timeout_exception& ) //intentionally not logged
443  {
444  } // catch
445 #else
446  fc::usleep(fc::seconds(10));
447 #endif
448  }
449  catch (const fc::canceled_exception&)
450  {
451  ilog( "p2p_network_connect_loop canceled" );
452  throw;
453  }
454  FC_CAPTURE_AND_LOG( (0) ) // GCOVR_EXCL_LINE
455  }// while !canceled
456  }
457 
459  {
461  dlog( "Triggering connect loop now" );
463  //if( _retrigger_connect_loop_promise )
464  // _retrigger_connect_loop_promise->set_value();
465  }
466 
468  {
470 
471  try
472  {
473  ilog("Starting an iteration of update_seed_nodes loop.");
474  for( const std::string& endpoint_string : _seed_nodes )
475  {
476  resolve_seed_node_and_add( endpoint_string );
477  }
478  ilog("Done an iteration of update_seed_nodes loop.");
479  }
480  catch (const fc::canceled_exception&)
481  {
482  ilog( "update_seed_nodes_task canceled" );
483  throw;
484  }
485  FC_CAPTURE_AND_LOG( (_seed_nodes) ) // GCOVR_EXCL_LINE
486 
488  }
489 
491  {
493 
495  return;
496 
498  return;
499 
500  constexpr uint32_t five = 5;
501  auto interval = _active_connections.empty() ? fc::minutes(five) : fc::hours(1);
503  fc::time_point::now() + interval,
504  "update_seed_nodes_loop" );
505  }
506 
508  {
510  return std::find_if(_received_sync_items.begin(), _received_sync_items.end(),
511  [&item_hash]( const graphene::net::block_message& message ) { return message.block_id == item_hash; } ) != _received_sync_items.end() ||
512  std::find_if(_new_received_sync_items.begin(), _new_received_sync_items.end(),
513  [&item_hash]( const graphene::net::block_message& message ) { return message.block_id == item_hash; } ) != _new_received_sync_items.end(); ;
514  }
515 
516  void node_impl::request_sync_item_from_peer( const peer_connection_ptr& peer, const item_hash_t& item_to_request )
517  {
519  dlog( "requesting item ${item_hash} from peer ${endpoint}", ("item_hash", item_to_request )("endpoint", peer->get_remote_endpoint() ) );
520  item_id item_id_to_request( graphene::net::block_message_type, item_to_request );
521  _active_sync_requests.insert( active_sync_requests_map::value_type(item_to_request, fc::time_point::now() ) );
522  peer->last_sync_item_received_time = fc::time_point::now();
523  peer->sync_items_requested_from_peer.insert(item_to_request);
524  peer->send_message( fetch_items_message(item_id_to_request.item_type, std::vector<item_hash_t>{item_id_to_request.item_hash} ) );
525  }
526 
527  void node_impl::request_sync_items_from_peer( const peer_connection_ptr& peer, const std::vector<item_hash_t>& items_to_request )
528  {
530  dlog( "requesting ${item_count} item(s) ${items_to_request} from peer ${endpoint}",
531  ("item_count", items_to_request.size())("items_to_request", items_to_request)("endpoint", peer->get_remote_endpoint()) );
532  for (const item_hash_t& item_to_request : items_to_request)
533  {
534  _active_sync_requests.insert( active_sync_requests_map::value_type(item_to_request, fc::time_point::now() ) );
535  peer->last_sync_item_received_time = fc::time_point::now();
536  peer->sync_items_requested_from_peer.insert(item_to_request);
537  }
538  peer->send_message(fetch_items_message(graphene::net::block_message_type, items_to_request));
539  }
540 
542  {
545  {
547  dlog( "beginning another iteration of the sync items loop" );
548 
550  {
551  std::map<peer_connection_ptr, std::vector<item_hash_t> > sync_item_requests_to_send;
552 
553  {
554  std::set<item_hash_t> sync_items_to_request;
555 
556  // for each idle peer that we're syncing with
558  for( const peer_connection_ptr& peer : _active_connections )
559  {
560  if( peer->we_need_sync_items_from_peer &&
561  // if we've already scheduled a request for this peer, don't consider scheduling another
562  sync_item_requests_to_send.find(peer) == sync_item_requests_to_send.end() &&
563  peer->idle() )
564  {
565  if (!peer->inhibit_fetching_sync_blocks)
566  {
567  // loop through the items it has that we don't yet have on our blockchain
568  for( const auto& item_to_potentially_request : peer->ids_of_items_to_get )
569  {
570  // if we don't already have this item in our temporary storage
571  // and we haven't requested from another syncing peer
572  if( // already got it, but for some reson it's still in our list of items to fetch
573  !have_already_received_sync_item(item_to_potentially_request) &&
574  // we have already decided to request it from another peer during this iteration
575  sync_items_to_request.find(item_to_potentially_request) == sync_items_to_request.end() &&
576  // we've requested it in a previous iteration and we're still waiting for it to arrive
577  _active_sync_requests.find(item_to_potentially_request) == _active_sync_requests.end() )
578  {
579  // then schedule a request from this peer
580  sync_item_requests_to_send[peer].push_back(item_to_potentially_request);
581  sync_items_to_request.insert( item_to_potentially_request );
582  if (sync_item_requests_to_send[peer].size() >= _max_sync_blocks_per_peer)
583  break;
584  }
585  }
586  }
587  }
588  }
589  } // end non-preemptable section
590 
591  // make all the requests we scheduled in the loop above
592  for( auto sync_item_request : sync_item_requests_to_send )
593  request_sync_items_from_peer( sync_item_request.first, sync_item_request.second );
594  sync_item_requests_to_send.clear();
595  }
596  else
597  dlog("fetch_sync_items_loop is suspended pending backlog processing");
598 
600  {
601  dlog( "no sync items to fetch right now, going to sleep" );
603  = fc::promise<void>::create("graphene::net::retrigger_fetch_sync_items_loop");
606  }
607  } // while( !canceled )
608  }
609 
611  {
613  dlog( "Triggering fetch sync items loop now" );
617  }
618 
620  {
622  for( const peer_connection_ptr& peer : _active_connections )
623  {
624  if (peer->inventory_peer_advertised_to_us.find(item) != peer->inventory_peer_advertised_to_us.end() )
625  return true;
626  }
627  return false;
628  }
629 
631  {
634  {
635  _items_to_fetch_updated = false;
636  dlog("beginning an iteration of fetch items (${count} items to fetch)",
637  ("count", _items_to_fetch.size()));
638 
639  fc::time_point oldest_timestamp_to_fetch = fc::time_point::now()
641  fc::time_point next_peer_unblocked_time = fc::time_point::maximum();
642 
643  // we need to construct a list of items to request from each peer first,
644  // then send the messages (in two steps, to avoid yielding while iterating)
645  // we want to evenly distribute our requests among our peers.
646  struct requested_item_count_index {};
647  struct peer_and_items_to_fetch
648  {
649  peer_connection_ptr peer;
650  std::vector<item_id> item_ids;
651  peer_and_items_to_fetch(const peer_connection_ptr& peer) : peer(peer) {}
652  bool operator<(const peer_and_items_to_fetch& rhs) const { return peer < rhs.peer; }
653  size_t number_of_items() const { return item_ids.size(); }
654  };
655  using fetch_messages_to_send_set = boost::multi_index_container< peer_and_items_to_fetch, bmi::indexed_by<
656  bmi::ordered_unique<
657  bmi::member<peer_and_items_to_fetch, peer_connection_ptr, &peer_and_items_to_fetch::peer> >,
658  bmi::ordered_non_unique< bmi::tag<requested_item_count_index>,
659  bmi::const_mem_fun<peer_and_items_to_fetch, size_t, &peer_and_items_to_fetch::number_of_items> >
660  > >;
661  fetch_messages_to_send_set items_by_peer;
662 
663  // initialize the fetch_messages_to_send with an empty set of items for all idle peers
664  {
666  for (const peer_connection_ptr& peer : _active_connections)
667  if (peer->idle())
668  items_by_peer.insert(peer_and_items_to_fetch(peer));
669  }
670 
671  // now loop over all items we want to fetch
672  for (auto item_iter = _items_to_fetch.begin(); item_iter != _items_to_fetch.end();)
673  {
674  if (item_iter->timestamp < oldest_timestamp_to_fetch)
675  {
676  // this item has probably already fallen out of our peers' caches, we'll just ignore it.
677  // this can happen during flooding, and the _items_to_fetch could otherwise get clogged
678  // with a bunch of items that we'll never be able to request from any peer
679  wlog("Unable to fetch item ${item} before its likely expiration time, "
680  "removing it from our list of items to fetch",
681  ("item", item_iter->item));
682  item_iter = _items_to_fetch.erase(item_iter);
683  }
684  else
685  {
686  // find a peer that has it, we'll use the one who has the least requests going to it to load balance
687  bool item_fetched = false;
688  for (auto peer_iter = items_by_peer.get<requested_item_count_index>().begin(); peer_iter != items_by_peer.get<requested_item_count_index>().end(); ++peer_iter)
689  {
690  const peer_connection_ptr& peer = peer_iter->peer;
691  // if they have the item and we haven't already decided to ask them for too many other items
692  if (peer_iter->item_ids.size() < GRAPHENE_NET_MAX_ITEMS_PER_PEER_DURING_NORMAL_OPERATION &&
693  peer->inventory_peer_advertised_to_us.find(item_iter->item) != peer->inventory_peer_advertised_to_us.end())
694  {
695  if (item_iter->item.item_type == graphene::net::trx_message_type && peer->is_transaction_fetching_inhibited())
696  next_peer_unblocked_time = std::min(peer->transaction_fetching_inhibited_until, next_peer_unblocked_time);
697  else
698  {
699  //dlog("requesting item ${hash} from peer ${endpoint}",
700  // ("hash", iter->item.item_hash)("endpoint", peer->get_remote_endpoint()));
701  item_id item_id_to_fetch = item_iter->item;
702  peer->items_requested_from_peer.insert(peer_connection::item_to_time_map_type::value_type(
703  item_id_to_fetch, fc::time_point::now()));
704  item_iter = _items_to_fetch.erase(item_iter);
705  item_fetched = true;
706  items_by_peer.get<requested_item_count_index>().modify(peer_iter,
707  [&item_id_to_fetch](peer_and_items_to_fetch& peer_and_items) {
708  peer_and_items.item_ids.push_back(item_id_to_fetch);
709  });
710  break;
711  }
712  }
713  }
714  if (!item_fetched)
715  ++item_iter;
716  }
717  }
718 
719  // we've figured out which peer will be providing each item, now send the messages.
720  for (const peer_and_items_to_fetch& peer_and_items : items_by_peer)
721  {
722  // the item lists are heterogenous and
723  // the fetch_items_message can only deal with one item type at a time.
724  std::map<uint32_t, std::vector<item_hash_t> > items_to_fetch_by_type;
725  for (const item_id& item : peer_and_items.item_ids)
726  items_to_fetch_by_type[item.item_type].push_back(item.item_hash);
727  for (auto& items_by_type : items_to_fetch_by_type)
728  {
729  dlog("requesting ${count} items of type ${type} from peer ${endpoint}: ${hashes}",
730  ("count", items_by_type.second.size())("type", (uint32_t)items_by_type.first)
731  ("endpoint", peer_and_items.peer->get_remote_endpoint())
732  ("hashes", items_by_type.second));
733  peer_and_items.peer->send_message(fetch_items_message(items_by_type.first,
734  items_by_type.second));
735  }
736  }
737  items_by_peer.clear();
738 
740  {
741  _retrigger_fetch_item_loop_promise = fc::promise<void>::create("graphene::net::retrigger_fetch_item_loop");
742  fc::microseconds time_until_retrigger = fc::microseconds::maximum();
743  if (next_peer_unblocked_time != fc::time_point::maximum())
744  time_until_retrigger = next_peer_unblocked_time - fc::time_point::now();
745  try
746  {
747  if (time_until_retrigger > fc::microseconds(0))
748  _retrigger_fetch_item_loop_promise->wait(time_until_retrigger);
749  }
750  catch (const fc::timeout_exception&)
751  {
752  dlog("Resuming fetch_items_loop due to timeout -- one of our peers should no longer be throttled");
753  }
755  }
756  } // while !canceled
757  }
758 
760  {
765  }
766 
768  {
771  {
772  dlog("beginning an iteration of advertise inventory");
773  // swap inventory into local variable, clearing the node's copy
774  std::unordered_set<item_id> inventory_to_advertise;
775  _new_inventory.swap( inventory_to_advertise );
776 
777  // process all inventory to advertise and construct the inventory messages we'll send
778  // first, then send them all in a batch (to avoid any fiber interruption points while
779  // we're computing the messages)
780  std::list<std::pair<peer_connection_ptr, item_ids_inventory_message> > inventory_messages_to_send;
781  {
783  for (const peer_connection_ptr& peer : _active_connections)
784  {
785  // only advertise to peers who are in sync with us
786  //idump((peer->peer_needs_sync_items_from_us)); // for debug
787  if( !peer->peer_needs_sync_items_from_us )
788  {
789  std::map<uint32_t, std::vector<item_hash_t> > items_to_advertise_by_type;
790  // don't send the peer anything we've already advertised to it
791  // or anything it has advertised to us
792  // group the items we need to send by type, because we'll need to send one inventory message per type
793  size_t total_items_to_send = 0;
794  //idump((inventory_to_advertise)); // for debug
795  for (const item_id& item_to_advertise : inventory_to_advertise)
796  {
797  auto adv_to_peer = peer->inventory_advertised_to_peer.find(item_to_advertise);
798  auto adv_to_us = peer->inventory_peer_advertised_to_us.find(item_to_advertise);
799 
800  if (adv_to_peer == peer->inventory_advertised_to_peer.end() &&
801  adv_to_us == peer->inventory_peer_advertised_to_us.end())
802  {
803  items_to_advertise_by_type[item_to_advertise.item_type].push_back(item_to_advertise.item_hash);
804  peer->inventory_advertised_to_peer.insert(
806  ++total_items_to_send;
807  if (item_to_advertise.item_type == trx_message_type)
808  testnetlog("advertising transaction ${id} to peer ${endpoint}",
809  ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint()));
810  dlog("advertising item ${id} to peer ${endpoint}",
811  ("id", item_to_advertise.item_hash)("endpoint", peer->get_remote_endpoint()));
812  }
813  else
814  {
815  if( adv_to_peer != peer->inventory_advertised_to_peer.end() )
816  dlog( "adv_to_peer != peer->inventory_advertised_to_peer.end() : ${adv_to_peer}",
817  ("adv_to_peer", *adv_to_peer) );
818  if( adv_to_us != peer->inventory_peer_advertised_to_us.end() )
819  dlog( "adv_to_us != peer->inventory_peer_advertised_to_us.end() : ${adv_to_us}",
820  ("adv_to_us", *adv_to_us) );
821  }
822  }
823  dlog("advertising ${count} new item(s) of ${types} type(s) to peer ${endpoint}",
824  ("count", total_items_to_send)
825  ("types", items_to_advertise_by_type.size())
826  ("endpoint", peer->get_remote_endpoint()));
827  for (auto items_group : items_to_advertise_by_type)
828  {
829  inventory_messages_to_send.emplace_back(std::make_pair(
830  peer, item_ids_inventory_message(items_group.first, items_group.second)));
831  }
832  }
833  peer->clear_old_inventory();
834  }
835  } // lock_guard
836 
837  for (auto iter = inventory_messages_to_send.begin(); iter != inventory_messages_to_send.end(); ++iter)
838  iter->first->send_message(iter->second);
839  inventory_messages_to_send.clear();
840 
841  if (_new_inventory.empty())
842  {
844  = fc::promise<void>::create("graphene::net::retrigger_advertise_inventory_loop");
847  }
848  } // while(!canceled)
849  }
850 
852  {
856  }
857 
859  {
861  std::list<peer_connection_ptr> peers_to_disconnect_gently;
862  std::list<peer_connection_ptr> peers_to_disconnect_forcibly;
863  std::list<peer_connection_ptr> peers_to_send_keep_alive;
864  std::list<peer_connection_ptr> peers_to_terminate;
865 
866  try {
867  // Note: if the node is shutting down, it's possible that _delegate is already unusable,
868  // in this case, we'll get an exception
869  _recent_block_interval_seconds = _delegate->get_current_block_interval_in_seconds();
870 
871  // Disconnect peers that haven't sent us any data recently
872  // These numbers are just guesses and we need to think through how this works better.
873  // If we and our peers get disconnected from the rest of the network, we will not
874  // receive any blocks or transactions from the rest of the world, and that will
875  // probably make us disconnect from our peers even though we have working connections to
876  // them (but they won't have sent us anything since they aren't getting blocks either).
877  // This might not be so bad because it could make us initiate more connections and
878  // reconnect with the rest of the network, or it might just futher isolate us.
879  // As usual, the first step is to walk through all our peers and figure out which
880  // peers need action (disconneting, sending keepalives, etc), then we walk through
881  // those lists yielding at our leisure later.
882 
883  uint32_t handshaking_timeout = _peer_inactivity_timeout;
884  fc::time_point handshaking_disconnect_threshold = fc::time_point::now() - fc::seconds(handshaking_timeout);
885  {
887  for( const peer_connection_ptr& handshaking_peer : _handshaking_connections )
888  {
889  if( handshaking_peer->connection_initiation_time < handshaking_disconnect_threshold &&
890  handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold &&
891  handshaking_peer->get_last_message_sent_time() < handshaking_disconnect_threshold )
892  {
893  wlog( "Forcibly disconnecting from handshaking peer ${peer} due to inactivity of at least ${timeout} seconds",
894  ( "peer", handshaking_peer->get_remote_endpoint() )("timeout", handshaking_timeout ) );
895  wlog("Peer's negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}",
896  ("status", handshaking_peer->negotiation_status)
897  ("sent", handshaking_peer->get_total_bytes_sent())
898  ("received", handshaking_peer->get_total_bytes_received()));
899  handshaking_peer->connection_closed_error = fc::exception(FC_LOG_MESSAGE(warn,
900  "Terminating handshaking connection due to inactivity of ${timeout} seconds. Negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}",
901  ("peer", handshaking_peer->get_remote_endpoint())
902  ("timeout", handshaking_timeout)
903  ("status", handshaking_peer->negotiation_status)
904  ("sent", handshaking_peer->get_total_bytes_sent())
905  ("received", handshaking_peer->get_total_bytes_received())));
906  peers_to_disconnect_forcibly.push_back( handshaking_peer );
907  } // if
908  } // for
909  } // scoped_lock
910  // timeout for any active peers is two block intervals
911  uint32_t active_disconnect_timeout = 10 * _recent_block_interval_seconds;
912  uint32_t active_send_keepalive_timeout = active_disconnect_timeout / 2;
913 
914  // set the ignored request time out to 6 second. When we request a block
915  // or transaction from a peer, this timeout determines how long we wait for them
916  // to reply before we give up and ask another peer for the item.
917  // Ideally this should be significantly shorter than the block interval, because
918  // we'd like to realize the block isn't coming and fetch it from a different
919  // peer before the next block comes in.
920  // Increased to 6 from 1 in #1660 due to heavy load. May need to adjust further
921  // Note: #1660 is https://github.com/steemit/steem/issues/1660
922  fc::microseconds active_ignored_request_timeout = fc::seconds(6);
923 
924  fc::time_point active_disconnect_threshold = fc::time_point::now() - fc::seconds(active_disconnect_timeout);
925  fc::time_point active_send_keepalive_threshold = fc::time_point::now() - fc::seconds(active_send_keepalive_timeout);
926  fc::time_point active_ignored_request_threshold = fc::time_point::now() - active_ignored_request_timeout;
927  {
929 
930  for( const peer_connection_ptr& active_peer : _active_connections )
931  {
932  if( active_peer->connection_initiation_time < active_disconnect_threshold &&
933  active_peer->get_last_message_received_time() < active_disconnect_threshold )
934  {
935  wlog( "Closing connection with peer ${peer} due to inactivity of at least ${timeout} seconds",
936  ( "peer", active_peer->get_remote_endpoint() )("timeout", active_disconnect_timeout ) );
937  peers_to_disconnect_gently.push_back( active_peer );
938  }
939  else
940  {
941  bool disconnect_due_to_request_timeout = false;
942  if (!active_peer->sync_items_requested_from_peer.empty() &&
943  active_peer->last_sync_item_received_time < active_ignored_request_threshold)
944  {
945  wlog("Disconnecting peer ${peer} because they haven't made any progress on my remaining ${count} sync item requests",
946  ("peer", active_peer->get_remote_endpoint())("count",
947  active_peer->sync_items_requested_from_peer.size()));
948  disconnect_due_to_request_timeout = true;
949  }
950  if (!disconnect_due_to_request_timeout &&
951  active_peer->item_ids_requested_from_peer &&
952  active_peer->item_ids_requested_from_peer->get<1>() < active_ignored_request_threshold)
953  {
954  wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${synopsis}",
955  ("peer", active_peer->get_remote_endpoint())
956  ("synopsis", active_peer->item_ids_requested_from_peer->get<0>()));
957  disconnect_due_to_request_timeout = true;
958  }
959  if (!disconnect_due_to_request_timeout)
960  for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->items_requested_from_peer)
961  if (item_and_time.second < active_ignored_request_threshold)
962  {
963  wlog("Disconnecting peer ${peer} because they didn't respond to my request for item ${id}",
964  ("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash));
965  disconnect_due_to_request_timeout = true;
966  break;
967  }
968  if (disconnect_due_to_request_timeout)
969  {
970  // we should probably disconnect nicely and give them a reason, but right now the logic
971  // for rescheduling the requests only executes when the connection is fully closed,
972  // and we want to get those requests rescheduled as soon as possible
973  peers_to_disconnect_forcibly.push_back(active_peer);
974  }
975  else if (active_peer->connection_initiation_time < active_send_keepalive_threshold &&
976  active_peer->get_last_message_received_time() < active_send_keepalive_threshold)
977  {
978  wlog( "Sending a keepalive message to peer ${peer} who hasn't sent us any messages in the last ${timeout} seconds",
979  ( "peer", active_peer->get_remote_endpoint() )("timeout", active_send_keepalive_timeout ) );
980  peers_to_send_keep_alive.push_back(active_peer);
981  }
982  else if (active_peer->we_need_sync_items_from_peer &&
983  !active_peer->is_currently_handling_message() &&
984  !active_peer->item_ids_requested_from_peer &&
985  active_peer->ids_of_items_to_get.empty())
986  {
987  // This is a state we should never get into in the first place, but if we do, we should disconnect the peer
988  // to re-establish the connection.
989  fc_wlog(fc::logger::get("sync"), "Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.",
990  ("peer", active_peer->get_remote_endpoint()));
991  wlog("Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.",
992  ("peer", active_peer->get_remote_endpoint()));
993  peers_to_disconnect_forcibly.push_back(active_peer);
994  }
995  } // else
996  } // for
997  } // scoped_lock
998 
1000  {
1002  for( const peer_connection_ptr& closing_peer : _closing_connections )
1003  {
1004  if( closing_peer->connection_closed_time < closing_disconnect_threshold )
1005  {
1006  // we asked this peer to close their connectoin to us at least GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT
1007  // seconds ago, but they haven't done it yet. Terminate the connection now
1008  wlog( "Forcibly disconnecting peer ${peer} who failed to close their connection in a timely manner",
1009  ( "peer", closing_peer->get_remote_endpoint() ) );
1010  peers_to_disconnect_forcibly.push_back( closing_peer );
1011  }
1012  } // for
1013  } // scoped_lock
1014  uint32_t failed_terminate_timeout_seconds = 120;
1015  fc::time_point failed_terminate_threshold = fc::time_point::now() - fc::seconds(failed_terminate_timeout_seconds);
1016  {
1018  for (const peer_connection_ptr& peer : _terminating_connections )
1019  {
1020  if (peer->get_connection_terminated_time() != fc::time_point::min() &&
1021  peer->get_connection_terminated_time() < failed_terminate_threshold)
1022  {
1023  wlog("Terminating connection with peer ${peer}, closing the connection didn't work", ("peer", peer->get_remote_endpoint()));
1024  peers_to_terminate.push_back(peer);
1025  }
1026  }
1027  } // scoped_lock
1028  // That's the end of the sorting step; now all peers that require further processing are now in one of the
1029  // lists peers_to_disconnect_gently, peers_to_disconnect_forcibly, peers_to_send_keep_alive, or peers_to_terminate
1030 
1031  // if we've decided to delete any peers, do it now; in its current implementation this doesn't yield,
1032  // and once we start yielding, we may find that we've moved that peer to another list (closed or active)
1033  // and that triggers assertions, maybe even errors
1034  {
1036  for (const peer_connection_ptr& peer : peers_to_terminate )
1037  {
1041  }
1042  } // scoped_lock
1043  peers_to_terminate.clear();
1044 
1045  // if we're going to abruptly disconnect anyone, do it here
1046  // (it doesn't yield). I don't think there would be any harm if this were
1047  // moved to the yielding section
1048  for( const peer_connection_ptr& peer : peers_to_disconnect_forcibly )
1049  {
1051  peer->close_connection();
1052  }
1053  peers_to_disconnect_forcibly.clear();
1054 
1055  // Now process the peers that we need to do yielding functions with (disconnect sends a message with the
1056  // disconnect reason, so it may yield)
1057  for( const peer_connection_ptr& peer : peers_to_disconnect_gently )
1058  {
1059  {
1061  fc::exception detailed_error( FC_LOG_MESSAGE(warn, "Disconnecting due to inactivity",
1062  ( "last_message_received_seconds_ago", (peer->get_last_message_received_time()
1063  - fc::time_point::now() ).count() / fc::seconds(1 ).count() )
1064  ( "last_message_sent_seconds_ago", (peer->get_last_message_sent_time()
1065  - fc::time_point::now() ).count() / fc::seconds(1 ).count() )
1066  ( "inactivity_timeout", _active_connections.find(peer ) != _active_connections.end()
1068  disconnect_from_peer( peer.get(), "Disconnecting due to inactivity", false, detailed_error );
1069  }
1070  }
1071  peers_to_disconnect_gently.clear();
1072 
1073  for( const peer_connection_ptr& peer : peers_to_send_keep_alive )
1074  peer->send_message(current_time_request_message(),
1075  offsetof(current_time_request_message, request_sent_time));
1076  peers_to_send_keep_alive.clear();
1077 
1078  } catch( const fc::exception& e ) {
1079  wlog( "Exception caught in kill_inactive_conns_loop: ${e}", ("e",e.to_detail_string()) );
1080  // If the node is shutting down, we just quit, no need to throw.
1081  // If the node is not shutting down, the old code will throw, which means we won't schedule a new loop,
1082  // likely it's unexpected behavior.
1083  // Thus we don't throw here.
1084  }
1085 
1087  {
1089  [this,self](){ kill_inactive_conns_loop(self); },
1091  "kill_inactive_conns_loop" );
1092  }
1093  }
1094 
1096  {
1098  {
1100  // JMJ 2018-10-22 Unsure why we're making a copy here, but this is probably unnecessary
1101  std::list<peer_connection_ptr> original_active_peers(_active_connections.begin(), _active_connections.end());
1102  for( const peer_connection_ptr& active_peer : original_active_peers )
1103  {
1104  try
1105  {
1106  active_peer->expecting_address_message = true;
1107  active_peer->send_message(address_request_message());
1108  }
1109  catch ( const fc::canceled_exception& )
1110  {
1111  throw;
1112  }
1113  catch (const fc::exception& e)
1114  {
1115  dlog("Caught exception while sending address request message to peer ${peer} : ${e}",
1116  ("peer", active_peer->get_remote_endpoint())("e", e));
1117  }
1118  }
1119  }
1120 
1121  // this has nothing to do with updating the peer list, but we need to prune this list
1122  // at regular intervals, this is a fine place to do it.
1123  fc::time_point_sec oldest_failed_ids_to_keep(fc::time_point::now() - fc::minutes(15));
1124  auto oldest_failed_ids_to_keep_iter = _recently_failed_items.get<peer_connection::timestamp_index>()
1125  .lower_bound(oldest_failed_ids_to_keep);
1126  auto begin_iter = _recently_failed_items.get<peer_connection::timestamp_index>().begin();
1128  .erase(begin_iter, oldest_failed_ids_to_keep_iter);
1129 
1133  "fetch_updated_peer_lists_loop" );
1134  }
1135  void node_impl::update_bandwidth_data(uint32_t bytes_read_this_second, uint32_t bytes_written_this_second)
1136  {
1138  _avg_net_read_speed_seconds.push_back(bytes_read_this_second);
1139  _avg_net_write_speed_seconds.push_back(bytes_written_this_second);
1141  constexpr uint8_t seconds_per_minute = 60;
1142  constexpr uint8_t minutes_per_hour = 60;
1143  if (_avg_net_usage_second_counter >= seconds_per_minute)
1144  {
1147  uint32_t average_read_this_minute = (uint32_t)boost::accumulate(_avg_net_read_speed_seconds, uint64_t(0))
1148  / (uint32_t)_avg_net_read_speed_seconds.size();
1149  _avg_net_read_speed_minutes.push_back(average_read_this_minute);
1150  uint32_t average_written_this_minute = (uint32_t)boost::accumulate(_avg_net_write_speed_seconds, uint64_t(0))
1151  / (uint32_t)_avg_net_write_speed_seconds.size();
1152  _avg_net_write_speed_minutes.push_back(average_written_this_minute);
1153  if (_avg_net_usage_minute_counter >= minutes_per_hour)
1154  {
1156  uint32_t average_read_this_hour = (uint32_t)boost::accumulate(_avg_net_read_speed_minutes, uint64_t(0))
1157  / (uint32_t)_avg_net_read_speed_minutes.size();
1158  _avg_net_read_speed_hours.push_back(average_read_this_hour);
1159  uint32_t average_written_this_hour = (uint32_t)boost::accumulate(_avg_net_write_speed_minutes, uint64_t(0))
1160  / (uint32_t)_avg_net_write_speed_minutes.size();
1161  _avg_net_write_speed_hours.push_back(average_written_this_hour);
1162  }
1163  }
1164  }
1166  {
1168  fc::time_point_sec current_time = fc::time_point::now();
1169 
1171  _bandwidth_monitor_last_update_time = current_time;
1172 
1173  uint32_t seconds_since_last_update = current_time.sec_since_epoch()
1175  seconds_since_last_update = std::max(UINT32_C(1), seconds_since_last_update);
1176  uint32_t bytes_read_this_second = _rate_limiter.get_actual_download_rate();
1177  uint32_t bytes_written_this_second = _rate_limiter.get_actual_upload_rate();
1178  for (uint32_t i = 0; i < seconds_since_last_update - 1; ++i)
1179  update_bandwidth_data(0, 0);
1180  update_bandwidth_data(bytes_read_this_second, bytes_written_this_second);
1181  _bandwidth_monitor_last_update_time = current_time;
1182 
1186  "bandwidth_monitor_loop" );
1187  }
1188 
1190  {
1192  dump_node_status();
1196  "dump_node_status_task");
1197  }
1198 
1200  {
1202 #ifdef USE_PEERS_TO_DELETE_MUTEX
1203  fc::scoped_lock<fc::mutex> lock(_peers_to_delete_mutex);
1204  dlog("in delayed_peer_deletion_task with ${count} in queue", ("count", _peers_to_delete.size()));
1205  _peers_to_delete.clear();
1206  dlog("_peers_to_delete cleared");
1207 #else
1208  while (!_peers_to_delete.empty())
1209  {
1210  std::list<peer_connection_ptr> peers_to_delete_copy;
1211  dlog("beginning an iteration of delayed_peer_deletion_task with ${count} in queue",
1212  ("count", _peers_to_delete.size()));
1213  peers_to_delete_copy.swap(_peers_to_delete);
1214  }
1215  dlog("leaving delayed_peer_deletion_task");
1216 #endif
1217  }
1218 
1220  {
1222 
1223  assert(_handshaking_connections.find(peer_to_delete) == _handshaking_connections.end());
1224  assert(_active_connections.find(peer_to_delete) == _active_connections.end());
1225  assert(_closing_connections.find(peer_to_delete) == _closing_connections.end());
1226  assert(_terminating_connections.find(peer_to_delete) == _terminating_connections.end());
1227 
1228 #ifdef USE_PEERS_TO_DELETE_MUTEX
1229  dlog("scheduling peer for deletion: ${peer} (may block on a mutex here)",
1230  ("peer", peer_to_delete->get_remote_endpoint()));
1231 
1232  size_t number_of_peers_to_delete;
1233  {
1234  fc::scoped_lock<fc::mutex> lock(_peers_to_delete_mutex);
1235  _peers_to_delete.emplace_back(peer_to_delete);
1236  number_of_peers_to_delete = _peers_to_delete.size();
1237  }
1238  dlog("peer scheduled for deletion: ${peer}", ("peer", peer_to_delete->get_remote_endpoint()));
1239 
1240  if (!_node_is_shutting_down &&
1242  {
1243  dlog("asyncing delayed_peer_deletion_task to delete ${size} peers",
1244  ("size", number_of_peers_to_delete));
1246  "delayed_peer_deletion_task" );
1247  }
1248  else
1249  dlog("delayed_peer_deletion_task is already scheduled (current size of _peers_to_delete is ${size})",
1250  ("size", number_of_peers_to_delete));
1251 #else
1252  dlog("scheduling peer for deletion: ${peer} (this will not block)",
1253  ("peer", peer_to_delete->get_remote_endpoint()));
1254  _peers_to_delete.push_back(peer_to_delete);
1255  if (!_node_is_shutting_down &&
1257  {
1258  dlog("asyncing delayed_peer_deletion_task to delete ${size} peers", ("size", _peers_to_delete.size()));
1260  "delayed_peer_deletion_task" );
1261  }
1262  else
1263  dlog("delayed_peer_deletion_task is already scheduled (current size of _peers_to_delete is ${size})",
1264  ("size", _peers_to_delete.size()));
1265 
1266 #endif
1267  }
1268 
1270  {
1274  }
1275 
1277  {
1281  }
1282 
1284  {
1286  return (uint32_t)(_handshaking_connections.size() + _active_connections.size());
1287  }
1288 
1290  {
1292  {
1294  for (const peer_connection_ptr& active_peer : _active_connections)
1295  if (node_id == active_peer->node_id)
1296  return active_peer;
1297  }
1298  {
1300  for (const peer_connection_ptr& handshaking_peer : _handshaking_connections)
1301  if (node_id == handshaking_peer->node_id)
1302  return handshaking_peer;
1303  }
1304  return peer_connection_ptr();
1305  }
1306 
1307  // merge addresses received from a peer into our database
1308  bool node_impl::merge_address_info_with_potential_peer_database(const std::vector<address_info> addresses)
1309  {
1311  bool new_information_received = false;
1312  for (const address_info& address : addresses)
1313  {
1314  // If the peer's inbound port is 0, we don't add it to our peer database.
1315  // Although it should have been handled by the caller, be defensive here.
1316  if( 0 == address.remote_endpoint.port() )
1317  continue;
1318  // Note: if found, a copy is returned
1319  auto updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_ep(address.remote_endpoint);
1320  // Note:
1321  // We don't save node_id in the peer database so far
1322  // 1. node_id of that peer may have changed, but we don't check or update
1323  // 2. we don't check by node_id either, in case when a peer's IP address has changed, we don't handle it
1324  // 3. if the peer's inbound port is not 0, no matter if the address is reported as firewalled or not,
1325  // we add it to our database and check by ourselves later
1326  if (address.last_seen_time > updated_peer_record.last_seen_time) // usually true, except when received from
1327  // multiple peers in the same second
1328  {
1329  new_information_received = true;
1330  updated_peer_record.last_seen_time = address.last_seen_time;
1331  _potential_peer_db.update_entry(updated_peer_record);
1332  }
1333  }
1334  // TODO maybe delete too old info by the way
1335  return new_information_received;
1336  }
1337 
1339  {
1341  dlog("Currently have ${current} of [${desired}/${max}] connections",
1342  ("current", get_number_of_connections())
1343  ("desired", _desired_number_of_connections)
1345  dlog(" my id is ${id}", ("id", _node_id));
1346 
1347  {
1349  for (const peer_connection_ptr& active_connection : _active_connections)
1350  {
1351  dlog(" active: ${endpoint} with ${id} [${direction}]",
1352  ("endpoint", active_connection->get_remote_endpoint())
1353  ("id", active_connection->node_id)
1354  ("direction", active_connection->direction));
1355  }
1356  }
1357  {
1359  for (const peer_connection_ptr& handshaking_connection : _handshaking_connections)
1360  {
1361  dlog(" handshaking: ${endpoint} with ${id} [${direction}]",
1362  ("endpoint", handshaking_connection->get_remote_endpoint())
1363  ("id", handshaking_connection->node_id)
1364  ("direction", handshaking_connection->direction));
1365  }
1366  }
1367  }
1368 
1369  void node_impl::on_message( peer_connection* originating_peer, const message& received_message )
1370  {
1372  message_hash_type message_hash = received_message.id();
1373  dlog("handling message ${type} ${hash} size ${size} from peer ${endpoint}",
1374  ("type", graphene::net::core_message_type_enum(received_message.msg_type.value()))("hash", message_hash)
1375  ("size", received_message.size)
1376  ("endpoint", originating_peer->get_remote_endpoint()));
1377  // Gatekeeping code
1378  if( originating_peer->we_have_requested_close
1379  // allow hello_message so we can learn more about the peer
1380  && received_message.msg_type.value() != core_message_type_enum::hello_message_type
1381  // allow closing_connection_message so we can finish disconnecting
1382  && received_message.msg_type.value() != core_message_type_enum::closing_connection_message_type )
1383  {
1384  dlog( "Unexpected message from peer ${peer} while we have requested to close connection",
1385  ("peer", originating_peer->get_remote_endpoint()) );
1386  return;
1387  }
1388  switch ( received_message.msg_type.value() )
1389  {
1391  on_hello_message(originating_peer, received_message.as<hello_message>());
1392  break;
1394  on_connection_accepted_message(originating_peer, received_message.as<connection_accepted_message>());
1395  break;
1397  on_connection_rejected_message(originating_peer, received_message.as<connection_rejected_message>());
1398  break;
1400  on_address_request_message(originating_peer, received_message.as<address_request_message>());
1401  break;
1403  on_address_message(originating_peer, received_message.as<address_message>());
1404  break;
1407  originating_peer, received_message.as<fetch_blockchain_item_ids_message>());
1408  break;
1411  originating_peer, received_message.as<blockchain_item_ids_inventory_message>());
1412  break;
1414  on_fetch_items_message(originating_peer, received_message.as<fetch_items_message>());
1415  break;
1417  on_item_not_available_message(originating_peer, received_message.as<item_not_available_message>());
1418  break;
1420  on_item_ids_inventory_message(originating_peer, received_message.as<item_ids_inventory_message>());
1421  break;
1423  on_closing_connection_message(originating_peer, received_message.as<closing_connection_message>());
1424  break;
1426  process_block_message(originating_peer, received_message, message_hash);
1427  break;
1429  on_current_time_request_message(originating_peer, received_message.as<current_time_request_message>());
1430  break;
1432  on_current_time_reply_message(originating_peer, received_message.as<current_time_reply_message>());
1433  break;
1435  break;
1437  break;
1439  break;
1441  break;
1442 
1443  default:
1444  // ignore any message in between core_message_type_first and _last that we don't handle above
1445  // to allow us to add messages in the future
1446  if (received_message.msg_type.value() < core_message_type_enum::core_message_type_first ||
1447  received_message.msg_type.value() > core_message_type_enum::core_message_type_last)
1448  process_ordinary_message(originating_peer, received_message, message_hash);
1449  break;
1450  }
1451  }
1452 
1453 
1455  {
1457  // for the time being, shoehorn a bunch of properties into the user_data variant object,
1458  // which lets us add and remove fields without changing the protocol. Once we
1459  // settle on what we really want in there, we'll likely promote them to first
1460  // class fields in the hello message
1461  fc::mutable_variant_object user_data;
1462  user_data["fc_git_revision_sha"] = fc::git_revision_sha;
1463  user_data["fc_git_revision_unix_timestamp"] = fc::git_revision_unix_timestamp;
1464 #if defined( __APPLE__ )
1465  user_data["platform"] = "osx";
1466 #elif defined( __OpenBSD__ )
1467  user_data["platform"] = "obsd";
1468 #elif defined( __linux__ )
1469  user_data["platform"] = "linux";
1470 #elif defined( _MSC_VER )
1471  user_data["platform"] = "win32";
1472 #else
1473  user_data["platform"] = "other";
1474 #endif
1475  user_data["bitness"] = sizeof(void*) * 8;
1476 
1477  user_data["node_id"] = fc::variant( _node_id, 1 );
1478 
1479  item_hash_t head_block_id = _delegate->get_head_block_id();
1480  user_data["last_known_block_hash"] = fc::variant( head_block_id, 1 );
1481  user_data["last_known_block_number"] = _delegate->get_block_number(head_block_id);
1482  user_data["last_known_block_time"] = _delegate->get_block_time(head_block_id);
1483 
1484  if (!_hard_fork_block_numbers.empty())
1485  user_data["last_known_fork_block_number"] = _hard_fork_block_numbers.back();
1486 
1487  return user_data;
1488  }
1490  {
1492  // try to parse data out of the user_agent string
1493  if (user_data.contains("graphene_git_revision_sha"))
1494  originating_peer->graphene_git_revision_sha = user_data["graphene_git_revision_sha"].as_string();
1495  if (user_data.contains("graphene_git_revision_unix_timestamp"))
1497  user_data["graphene_git_revision_unix_timestamp"].as<uint32_t>(1));
1498  if (user_data.contains("fc_git_revision_sha"))
1499  originating_peer->fc_git_revision_sha = user_data["fc_git_revision_sha"].as_string();
1500  if (user_data.contains("fc_git_revision_unix_timestamp"))
1502  user_data["fc_git_revision_unix_timestamp"].as<uint32_t>(1));
1503  if (user_data.contains("platform"))
1504  originating_peer->platform = user_data["platform"].as_string();
1505  if (user_data.contains("bitness"))
1506  originating_peer->bitness = user_data["bitness"].as<uint32_t>(1);
1507  if (user_data.contains("node_id"))
1508  originating_peer->node_id = user_data["node_id"].as<node_id_t>(1);
1509  if (user_data.contains("last_known_fork_block_number"))
1510  originating_peer->last_known_fork_block_number = user_data["last_known_fork_block_number"].as<uint32_t>(1);
1511  }
1512 
1513  void node_impl::on_hello_message( peer_connection* originating_peer, const hello_message& hello_message_received )
1514  {
1516  auto remote_endpoint = originating_peer->get_remote_endpoint(); // Note: this returns a copy
1517  // Do gatekeeping first
1519  {
1520  // we can wind up here if we've connected to ourselves, and the source and
1521  // destination endpoints are the same, causing messages we send out
1522  // to arrive back on the initiating socket instead of the receiving
1523  // socket. If we did a complete job of enumerating local addresses,
1524  // we could avoid directly connecting to ourselves, or at least detect
1525  // immediately when we did it and disconnect.
1526 
1527  // The only way I know of that we'd get an unexpected hello that we
1528  // can't really guard against is if we do a simulatenous open, we
1529  // probably need to think through that case. We're not attempting that
1530  // yet, though, so it's ok to just disconnect here.
1531  wlog( "Unexpected hello_message from peer ${peer}, disconnecting",
1532  ("peer", remote_endpoint) );
1533  disconnect_from_peer( originating_peer, "Received an unexpected hello_message" );
1534  return;
1535  }
1536 
1537  // Check chain_id
1538  if( hello_message_received.chain_id != _chain_id )
1539  {
1540  wlog( "Received hello message from peer ${peer} on a different chain: ${message}",
1541  ("peer", remote_endpoint)
1542  ("message", hello_message_received) );
1543  // If it is an outbound connection, make sure we won't reconnect to the peer soon
1544  if( peer_connection_direction::outbound == originating_peer->direction )
1545  {
1546  // Note: deleting is not the best approach since it can be readded soon and we will reconnect soon.
1547  // Marking it "permanently rejected" is also not good enough since the peer can be "fixed".
1548  // It seems the best approach is to reduce its weight significantly.
1549  greatly_delay_next_conn_to( this, *remote_endpoint );
1550  }
1551  // Now reject
1552  std::ostringstream rejection_message;
1553  rejection_message << "You're on a different chain than I am. I'm on " << _chain_id.str() <<
1554  " and you're on " << hello_message_received.chain_id.str();
1556  *remote_endpoint,
1558  rejection_message.str() );
1560  originating_peer->send_message( message(connection_rejected) );
1561  // for this type of message, we're immediately disconnecting this peer, instead of trying to
1562  // allowing her to ask us for peers (any of our peers will be on the same chain as us, so there's no
1563  // benefit of sharing them)
1564  disconnect_from_peer( originating_peer, "You are on a different chain from me" );
1565  return;
1566  }
1567 
1568  // Validate the peer's public key.
1569  // Note: the node_id in user_data is not verified.
1570  fc::optional<fc::ecc::public_key> expected_node_public_key;
1571  try
1572  {
1573  fc::sha256::encoder shared_secret_encoder;
1574  fc::sha512 shared_secret = originating_peer->get_shared_secret();
1575  shared_secret_encoder.write(shared_secret.data(), sizeof(shared_secret));
1576  expected_node_public_key = fc::ecc::public_key( hello_message_received.signed_shared_secret,
1577  shared_secret_encoder.result(), false );
1578  }
1579  catch( const fc::exception& e )
1580  {
1581  wlog( "Error when validating signature in hello message from peer ${peer}: ${e}",
1582  ("peer", remote_endpoint)("e", e.to_detail_string()) );
1583  }
1584 
1585  if( !expected_node_public_key
1586  || hello_message_received.node_public_key != expected_node_public_key->serialize() )
1587  {
1588  wlog( "Invalid signature in hello message from peer ${peer}",
1589  ("peer", remote_endpoint) );
1591  *remote_endpoint,
1593  "Invalid signature in hello message" );
1595  originating_peer->send_message( message(connection_rejected) );
1596  // for this type of message, we're immediately disconnecting this peer
1597  disconnect_from_peer( originating_peer, connection_rejected.reason_string );
1598  return;
1599  }
1600 
1601  // this already_connected check must come before we fill in peer data below
1602  node_id_t peer_node_id = hello_message_received.node_public_key;
1603  try
1604  {
1605  peer_node_id = hello_message_received.user_data["node_id"].as<node_id_t>(1);
1606  }
1607  catch (const fc::exception&)
1608  {
1609  // either it's not there or it's not a valid session id. either way, ignore.
1610  dlog( "Peer ${endpoint} sent us a hello message without a valid node_id in user_data",
1611  ("endpoint", remote_endpoint ) );
1612  }
1613  // The peer's node_id should not be null
1614  static const node_id_t null_node_id;
1615  if( null_node_id == peer_node_id )
1616  {
1617  wlog( "The node_id in the hello_message from peer ${peer} is null, disconnecting",
1618  ("peer", remote_endpoint) );
1619  disconnect_from_peer( originating_peer, "Your node_id in the hello_message is null" );
1620  return;
1621  }
1622  // Check whether the peer is myself
1623  if( _node_id == peer_node_id )
1624  {
1625  ilog( "Received a hello_message from peer ${peer} with id ${id} that is myself or claimed to be myself, "
1626  "rejection",
1627  ("peer", remote_endpoint)
1628  ("id", peer_node_id) );
1629  // If it is an outbound connection, make sure we won't reconnect to the peer soon
1630  if( peer_connection_direction::outbound == originating_peer->direction )
1631  {
1632  // Note: deleting is not the best approach since it can be readded soon and we will reconnect soon.
1633  // Marking it "permanently rejected" is also not good enough since the peer can be "fixed".
1634  // It seems the best approach is to reduce its weight significantly.
1635  greatly_delay_next_conn_to( this, *remote_endpoint );
1636  }
1637  // Now reject
1638  // Note: this can happen in rare cases if the peer is not actually myself but another node.
1639  // Anyway, we see it as ourselves, reject it and disconnect it.
1641  *remote_endpoint,
1643  "I'm connecting to myself" );
1645  originating_peer->send_message( message(connection_rejected) );
1646  disconnect_from_peer( originating_peer, connection_rejected.reason_string );
1647  return;
1648  }
1649  // Get a pointer to an exising connection to the peer (if one exists) for later use
1650  peer_connection_ptr already_connected_peer = get_peer_by_node_id( peer_node_id );
1651 
1652  // store off the data provided in the hello message
1653  originating_peer->user_agent = hello_message_received.user_agent;
1654  originating_peer->node_public_key = hello_message_received.node_public_key;
1655  originating_peer->core_protocol_version = hello_message_received.core_protocol_version;
1656  originating_peer->inbound_address = hello_message_received.inbound_address;
1657  originating_peer->inbound_port = hello_message_received.inbound_port;
1658  originating_peer->outbound_port = hello_message_received.outbound_port;
1659  // Note: more data is stored after initialized remote_inbound_endpoint
1660 
1661  // For an outbound connection, we know the remote_inbound_endpoint already, so keep it unchanged.
1662  // For an inbound connection, we initialize it here.
1663  if( !originating_peer->remote_inbound_endpoint )
1664  {
1665  // Note: the data is not yet verified, so we need to use it with caution.
1666  //
1667  // We will advertise "remote_inbound_endpoint" when other peers request addresses.
1668  //
1669  // On the one hand, we want to advertise as accurate data as possible to other peers (we will try to verify),
1670  // on the other hand, we still want to advertise it to other peers if we didn't have a chance to verify it.
1671  //
1672  // When the peer is not listening (i.e. it tells us its inbound port is 0), the inbound address it tells us
1673  // may be invalid (e.g. 0.0.0.0), and we are not going to verify it anyway.
1674  // For observation purposes, we still advertise it to other peers, and we need to tell them an address,
1675  // so we use the address we see.
1676  //
1677  // In addition, by now, our list or exclude list for peer advertisement only contains IP endpoints but not
1678  // nodes' public keys (we can't use node_id because it changes every time the node restarts). Using a valid
1679  // address is better for the purpose.
1680  if( 0 == originating_peer->inbound_port )
1681  originating_peer->remote_inbound_endpoint = fc::ip::endpoint( remote_endpoint->get_address() );
1682  else if( originating_peer->inbound_address.is_public_address()
1683  || originating_peer->inbound_address == remote_endpoint->get_address() )
1684  originating_peer->remote_inbound_endpoint = fc::ip::endpoint( originating_peer->inbound_address,
1685  originating_peer->inbound_port );
1686  else
1687  originating_peer->remote_inbound_endpoint = remote_endpoint;
1688  }
1689 
1690  // Note: store node_id after initialized remote_inbound_endpoint to avoid a race condition
1691 
1692  // will probably be overwritten in parse_hello_user_data_for_peer()
1693  originating_peer->node_id = hello_message_received.node_public_key;
1694 
1695  parse_hello_user_data_for_peer(originating_peer, hello_message_received.user_data);
1696 
1697  // if they didn't provide a last known fork, try to guess it
1698  if (originating_peer->last_known_fork_block_number == 0 &&
1699  originating_peer->graphene_git_revision_unix_timestamp)
1700  {
1701  uint32_t unix_timestamp = originating_peer->graphene_git_revision_unix_timestamp->sec_since_epoch();
1702  originating_peer->last_known_fork_block_number = _delegate->estimate_last_known_fork_from_git_revision_timestamp(unix_timestamp);
1703  }
1704 
1705  // now decide what to do with it
1706  if (originating_peer->last_known_fork_block_number != 0)
1707  {
1708  uint32_t next_fork_block_number = get_next_known_hard_fork_block_number(originating_peer->last_known_fork_block_number);
1709  if (next_fork_block_number != 0)
1710  {
1711  // we know about a fork they don't. See if we've already passed that block. If we have, don't let them
1712  // connect because we won't be able to give them anything useful
1713  uint32_t head_block_num = _delegate->get_block_number(_delegate->get_head_block_id());
1714  if (next_fork_block_number < head_block_num)
1715  {
1716 #ifdef ENABLE_DEBUG_ULOGS
1717  ulog("Rejecting connection from peer because their version is too old. Their version date: ${date}", ("date", originating_peer->graphene_git_revision_unix_timestamp));
1718 #endif
1719  wlog("Received hello message from peer running a version of that can only understand blocks up to #${their_hard_fork}, but I'm at head block number #${my_block_number}",
1720  ("their_hard_fork", next_fork_block_number)("my_block_number", head_block_num));
1721  std::ostringstream rejection_message;
1722  rejection_message << "Your client is outdated -- you can only understand blocks up to #" << next_fork_block_number << ", but I'm already on block #" << head_block_num;
1724  *remote_endpoint,
1726  rejection_message.str() );
1727 
1729  originating_peer->send_message(message(connection_rejected));
1730  // for this type of message, we're immediately disconnecting this peer, instead of trying to
1731  // allowing her to ask us for peers (any of our peers will be on the same chain as us, so there's no
1732  // benefit of sharing them)
1733  disconnect_from_peer(originating_peer, "Your client is too old, please upgrade");
1734  return;
1735  }
1736  }
1737  }
1738 
1739  if( peer_connection_ptr() != already_connected_peer )
1740  {
1741  // If it is an outbound connection, update the existing connection's inbound_endpoint.
1742  // Note: there may be a race condition that multiple tasks try to write the same data
1743  if( peer_connection_direction::outbound == originating_peer->direction
1744  && originating_peer->node_public_key == already_connected_peer->node_public_key )
1745  {
1746  auto already_connected_endpoint = already_connected_peer->get_remote_endpoint(); // This returns a copy
1747  ilog( "Verified that endpoint ${ep} is reachable and belongs to peer ${peer} with id ${id}",
1748  ("ep", remote_endpoint)
1749  ("peer", already_connected_endpoint)
1750  ("id", already_connected_peer->node_id) );
1751  // Do not replace a verified public address with a private or local address.
1752  // Note: there is a scenario that some nodes in the same local network may have connected to each other,
1753  // and of course some are outbound connections and some are inbound, so we are unable to update
1754  // all the data, not to mention that their external addresses might be inaccessible to each
1755  // other.
1756  // Unless they are all configured with the "p2p-inbound-endpoint" option with an external address,
1757  // even if they all start out connecting to each other's external addresses, at some point they
1758  // may try to connect to each other's local addresses and possibly stay connected.
1759  // In this case, if the nodes aren't configured with the "advertise-peer-algorithm" option and
1760  // related options properly, when advertising connected peers to other peers, they may expose
1761  // that they are in the same local network and connected to each other.
1762  // On the other hand, when we skip updates in some cases, we may end up trying to reconnect soon
1763  // and endlessly (which is addressed with additional_inbound_endpoints).
1764  already_connected_peer->additional_inbound_endpoints.insert( *remote_endpoint );
1765  if( peer_connection_direction::inbound == already_connected_peer->direction )
1766  {
1767  already_connected_peer->potential_inbound_endpoints[*remote_endpoint]
1769  }
1770  if( already_connected_peer->is_firewalled != firewalled_state::not_firewalled // implies it's inbound
1771  || remote_endpoint->get_address().is_public_address()
1772  || !already_connected_peer->get_endpoint_for_connecting()->get_address().is_public_address() )
1773  {
1774  ilog( "Saving verification result ${ep} for peer ${peer} with id ${id}",
1775  ("ep", remote_endpoint)
1776  ("peer", already_connected_endpoint)
1777  ("id", already_connected_peer->node_id) );
1778  already_connected_peer->remote_inbound_endpoint = remote_endpoint;
1779  already_connected_peer->is_firewalled = firewalled_state::not_firewalled;
1780  }
1781  // If the already connected peer is in the active connections list, save the endpoint to the peer db
1783  == already_connected_peer->negotiation_status )
1784  save_successful_address( this, *remote_endpoint );
1785  }
1786  // Now reject
1788  *remote_endpoint,
1790  "I'm already connected to you" );
1792  originating_peer->send_message( message(connection_rejected) );
1793  ilog("Received a hello_message from peer ${peer} that I'm already connected to (with id ${id}), rejection",
1794  ("peer", remote_endpoint)
1795  ("id", originating_peer->node_id));
1796  // If already connected, we disconnect
1797  disconnect_from_peer( originating_peer, connection_rejected.reason_string );
1798  }
1799 #ifdef ENABLE_P2P_DEBUGGING_API
1800  else if(!_allowed_peers.empty() &&
1801  _allowed_peers.find(originating_peer->node_id) == _allowed_peers.end())
1802  {
1804  *remote_endpoint,
1806  "you are not in my allowed_peers list");
1808  originating_peer->send_message( message(connection_rejected ) );
1809  dlog( "Received a hello_message from peer ${peer} who isn't in my allowed_peers list, rejection",
1810  ("peer", remote_endpoint ) );
1811  }
1812 #endif // ENABLE_P2P_DEBUGGING_API
1813  else
1814  {
1815  // whether we're planning on accepting them as a peer or not, they seem to be a valid node,
1816  // so add them to our database if they're not firewalled
1817  if( peer_connection_direction::outbound == originating_peer->direction )
1818  {
1819  // For outbound connection, we already know the peer is not firewalled,
1820  // and it should be already in the peer database. Do nothing here.
1821  }
1822  else if( 0 == originating_peer->inbound_port )
1823  {
1824  ilog( "peer ${peer} did not give an inbound port so I'm treating them as if they are firewalled.",
1825  ("peer", remote_endpoint) );
1826  originating_peer->is_firewalled = firewalled_state::firewalled;
1827  }
1828  else
1829  {
1830  // Note: no matter how we guess, we end up adding these to our peer database and trying to connect later.
1831 
1832  // First, we add the inbound endpoint that the peer told us it is listening on.
1833  fc::flat_set<fc::ip::endpoint> endpoints_to_save;
1834  endpoints_to_save.insert( fc::ip::endpoint( originating_peer->inbound_address,
1835  originating_peer->inbound_port ) );
1836 
1837  // Second, we add the address and port we see.
1838  // It might be the same as above, but that's OK.
1839  endpoints_to_save.insert( *remote_endpoint );
1840 
1841  // Third, we add the address we see, with the inbound port the peer told us.
1842  // It might be the same as above, but that's OK.
1843  endpoints_to_save.insert( fc::ip::endpoint( remote_endpoint->get_address(),
1844  originating_peer->inbound_port ) );
1845 
1846  ilog( "Saving potential endpoints to the peer database for peer ${peer}: ${endpoints}",
1847  ("peer", remote_endpoint) ("endpoints", endpoints_to_save) );
1848 
1849  for( const auto& ep : endpoints_to_save )
1850  {
1851  // add to the peer database
1852  auto updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_ep( ep );
1853  updated_peer_record.last_seen_time = fc::time_point::now();
1854  _potential_peer_db.update_entry( updated_peer_record );
1855  // mark as a potential inbound address
1857  }
1858 
1859  // Note: we don't update originating_peer->is_firewalled, because we might guess wrong
1860 
1861  }
1862 
1864  {
1866  *remote_endpoint,
1868  "not accepting any more incoming connections");
1870  originating_peer->send_message(message(connection_rejected));
1871  ilog("Received a hello_message from peer ${peer}, but I'm not accepting any more connections, rejection",
1872  ("peer", remote_endpoint));
1873  }
1874  else
1875  {
1877  originating_peer->send_message(message(connection_accepted_message()));
1878  ilog("Received a hello_message from peer ${peer}, sending reply to accept connection",
1879  ("peer", remote_endpoint));
1880  }
1881  }
1882  }
1883 
1885  const connection_accepted_message& ) const
1886  {
1888  // Gatekeeping code
1889  // We only send one address request message shortly after connected
1891  {
1892  // Log and ignore
1893  wlog( "Received an unexpected connection_accepted message from ${peer}",
1894  ("peer", originating_peer->get_remote_endpoint()) );
1895  return;
1896  }
1897 
1898  ilog( "Received a connection_accepted in response to my \"hello\" from ${peer}",
1899  ("peer", originating_peer->get_remote_endpoint()) );
1902  originating_peer->expecting_address_message = true;
1903  originating_peer->send_message(address_request_message());
1904  }
1905 
1906  void node_impl::on_connection_rejected_message(peer_connection* originating_peer, const connection_rejected_message& connection_rejected_message_received)
1907  {
1910  {
1911  ilog("Received a rejection from ${peer} in response to my \"hello\", reason: \"${reason}\"",
1912  ("peer", originating_peer->get_remote_endpoint())
1913  ("reason", connection_rejected_message_received.reason_string));
1914 
1918 
1919  if( connection_rejected_message_received.reason_code == rejection_reason_code::connected_to_self
1920  || connection_rejected_message_received.reason_code == rejection_reason_code::different_chain )
1921  {
1922  // Using remote_endpoint here for an outbound connection is OK.
1923  // For an inbound connection, we should have not saved anything to the peer database yet, nor we will
1924  // save anything (it would be weird if they rejected us but we didn't reject them),
1925  // so using remote_endpoint here at least won't do anything bad.
1926  // Note: we should not erase or update data by the peer's claimed inbound_address,
1927  // because the data is still unreliable.
1928  // Note: deleting is not the best approach since it can be readded soon and we will reconnect soon.
1929  // Marking it "permanently rejected" is also not good enough since the peer can be "fixed".
1930  // It seems the best approach is to reduce its weight significantly.
1931  greatly_delay_next_conn_to( this, *originating_peer->get_remote_endpoint() );
1932  // Note: we do not send closing_connection_message, but close directly. This is probably OK
1933  move_peer_to_closing_list(originating_peer->shared_from_this());
1934  originating_peer->close_connection();
1935  }
1936  // Note: ideally, if it is an outbound connection, and the rejection reason is "already_connected",
1937  // we should update the existing connection's inbound_endpoint and mark it as verified.
1938  // However, at the moment maybe we haven't processed its hello message,
1939  // so don't know its node_id and unable to locate the existing connection.
1940  // So it is better to do the update in on_hello_message().
1941  // It is also possible that its hello message comes too late and the connection is already closed,
1942  // in which case we don't have a chance to update anyway.
1943  else
1944  {
1945  // update our database to record that we were rejected so we won't try to connect again for a while
1946  // this only happens on connections we originate, so we should already know that peer is not firewalled
1947  fc::optional<potential_peer_record> updated_peer_record
1949  if (updated_peer_record)
1950  {
1951  updated_peer_record->last_connection_disposition = last_connection_rejected;
1952  updated_peer_record->last_connection_attempt_time = fc::time_point::now();
1953  // Note: we do not increase number_of_failed_connection_attempts here, this is probably OK
1954  _potential_peer_db.update_entry(*updated_peer_record);
1955  }
1956  originating_peer->expecting_address_message = true;
1957  originating_peer->send_message(address_request_message());
1958  }
1959  }
1960  else
1961  {
1962  // Note: in older versions, FC_THROW() was called here,
1963  // which would cause on_connection_closed() to be called,
1964  // which would then close the connection when the peer_connection object was destroyed.
1965  // Explicitly closing the connection here is more intuitive.
1966  wlog( "Unexpected connection_rejected_message from peer ${peer}, disconnecting",
1967  ("peer", originating_peer->get_remote_endpoint()) );
1968  disconnect_from_peer( originating_peer, "Received an unexpected connection_rejected_message" );
1969  }
1970  }
1971 
1973  {
1975  // Gatekeeping code
1978  {
1979  wlog( "Unexpected address_request_message from peer ${peer}, disconnecting",
1980  ("peer", originating_peer->get_remote_endpoint()) );
1981  disconnect_from_peer( originating_peer, "Received an unexpected address_request_message" );
1982  return;
1983  }
1984 
1985  dlog( "Received an address request message from peer ${peer}",
1986  ("peer", originating_peer->get_remote_endpoint()) );
1987 
1988  address_message reply;
1989  if (_address_builder != nullptr )
1990  _address_builder->build( this, reply );
1991  originating_peer->send_message(reply);
1992 
1993  // If we rejected their connection, disconnect now
1995  {
1996  disconnect_from_peer( originating_peer,
1997  "I rejected your connection request (hello message) so I'm disconnecting" );
1998  }
1999  }
2000 
2001  void node_impl::set_advertise_algorithm( const std::string& algo,
2002  const std::vector<std::string>& advertise_or_exclude_list )
2003  {
2005  if (algo == "exclude_list")
2006  {
2007  _address_builder = std::make_shared<exclude_address_builder>(advertise_or_exclude_list);
2008  }
2009  else if (algo == "list")
2010  {
2011  _address_builder = std::make_shared<list_address_builder>(advertise_or_exclude_list);
2012  }
2013  else if (algo == "nothing")
2014  {
2015  _address_builder = nullptr;
2016  }
2017  else
2018  _address_builder = std::make_shared<all_address_builder>();
2019  }
2020 
2022  const address_message& address_message_received )
2023  {
2025  // Do some gatekeeping here.
2026  // Malious peers can easily bypass our checks in on_hello_message(), and we will then request addresses anyway,
2027  // so checking connection_state here is useless.
2028  // The size can be large, so we only handle the first N addresses.
2029  // The peer might send us lots of address messages even if we didn't request,
2030  // so we'd better know whether we have sent an address request message recently.
2031  if( !originating_peer->expecting_address_message )
2032  {
2033  // Log and ignore
2034  wlog( "Received an unexpected address message containing ${size} addresses for peer ${peer}",
2035  ("size", address_message_received.addresses.size())
2036  ("peer", originating_peer->get_remote_endpoint()) );
2037  return;
2038  }
2039  originating_peer->expecting_address_message = false;
2040 
2041  dlog( "Received an address message containing ${size} addresses for peer ${peer}",
2042  ("size", address_message_received.addresses.size())
2043  ("peer", originating_peer->get_remote_endpoint()) );
2045  {
2046  size_t count = 0;
2047  for (const address_info& address : address_message_received.addresses)
2048  {
2049  dlog( " ${endpoint} last seen ${time}, firewalled status ${fw}",
2050  ("endpoint", address.remote_endpoint)("time", address.last_seen_time)
2051  ("fw", address.firewalled) );
2052  ++count;
2053  if( count >= _max_addrs_to_handle_at_once )
2054  break;
2055  }
2056  std::vector<graphene::net::address_info> updated_addresses;
2057  updated_addresses.reserve( count );
2059  count = 0;
2060  for( const address_info& address : address_message_received.addresses )
2061  {
2062  if( 0 == address.remote_endpoint.port() )
2063  continue;
2064  updated_addresses.emplace_back( address.remote_endpoint,
2065  now,
2066  address.latency,
2067  address.node_id,
2068  address.direction,
2069  address.firewalled );
2070  ++count;
2071  if( count >= _max_addrs_to_handle_at_once )
2072  break;
2073  }
2074  if( merge_address_info_with_potential_peer_database(updated_addresses) )
2076  }
2077 
2078  if (_handshaking_connections.find(originating_peer->shared_from_this()) != _handshaking_connections.end())
2079  {
2080  // if we were handshaking, we need to continue with the next step in handshaking (which is either
2081  // ending handshaking and starting synchronization or disconnecting)
2083  disconnect_from_peer(originating_peer, "You rejected my connection request (hello message) so I'm disconnecting");
2085  disconnect_from_peer(originating_peer, "I rejected your connection request (hello message) so I'm disconnecting");
2086  else
2087  {
2088  // Note: updating last_connection_disposition to last_connection_succeeded for inbound connections
2089  // doesn't seem correct
2090  if( peer_connection_direction::outbound == originating_peer->direction )
2091  save_successful_address( this, *originating_peer->get_remote_endpoint() );
2092 
2093  // transition it to our active list
2095  move_peer_to_active_list(originating_peer->shared_from_this());
2096  new_peer_just_added(originating_peer->shared_from_this());
2097  }
2098  }
2099  // else if this was an active connection, then this was just a reply to our periodic address requests.
2100  // we've processed it, there's nothing else to do
2101  // Note: we could reinitialize inbound endpoint verification here, but it doesn't seem necessary
2102  }
2103 
2105  const fetch_blockchain_item_ids_message& fetch_blockchain_item_ids_message_received)
2106  {
2108  // Gatekeeping code
2110  {
2111  wlog( "Unexpected fetch_blockchain_item_ids_message from peer ${peer}, disconnecting",
2112  ("peer", originating_peer->get_remote_endpoint()) );
2113  disconnect_from_peer( originating_peer, "Received an unexpected fetch_blockchain_item_ids_message" );
2114  return;
2115  }
2116 
2117  item_id peers_last_item_seen = item_id(fetch_blockchain_item_ids_message_received.item_type, item_hash_t());
2118  if (fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty())
2119  {
2120  dlog("sync: received a request for item ids starting at the beginning of the chain "
2121  "from peer ${peer_endpoint} (full request: ${synopsis})",
2122  ("peer_endpoint", originating_peer->get_remote_endpoint())
2123  ("synopsis", fetch_blockchain_item_ids_message_received.blockchain_synopsis));
2124  }
2125  else
2126  {
2127  item_hash_t peers_last_item_hash_seen = fetch_blockchain_item_ids_message_received.blockchain_synopsis.back();
2128  dlog("sync: received a request for item ids after ${last_item_seen} from peer ${peer_endpoint} (full request: ${synopsis})",
2129  ("last_item_seen", peers_last_item_hash_seen)
2130  ("peer_endpoint", originating_peer->get_remote_endpoint())
2131  ("synopsis", fetch_blockchain_item_ids_message_received.blockchain_synopsis));
2132  peers_last_item_seen.item_hash = peers_last_item_hash_seen;
2133  }
2134 
2136  reply_message.item_type = fetch_blockchain_item_ids_message_received.item_type;
2137  reply_message.total_remaining_item_count = 0;
2138  try
2139  {
2140  reply_message.item_hashes_available
2141  = _delegate->get_block_ids(fetch_blockchain_item_ids_message_received.blockchain_synopsis,
2142  reply_message.total_remaining_item_count);
2143  }
2144  catch (const peer_is_on_an_unreachable_fork&)
2145  {
2146  dlog("Peer is on a fork and there's no set of blocks we can provide to switch them to our fork");
2147  // we reply with an empty list as if we had an empty blockchain;
2148  // we don't want to disconnect because they may be able to provide
2149  // us with blocks on their chain
2150  }
2151 
2152  bool disconnect_from_inhibited_peer = false;
2153  // if our client doesn't have any items after the item the peer requested, it will send back
2154  // a list containing the last item the peer requested
2155  //idump((reply_message)(fetch_blockchain_item_ids_message_received.blockchain_synopsis)); // for debug
2156  if( reply_message.item_hashes_available.empty() )
2157  originating_peer->peer_needs_sync_items_from_us = false; /* I have no items in my blockchain */
2158  else if( !fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty() &&
2159  reply_message.item_hashes_available.size() == 1 &&
2160  std::find(fetch_blockchain_item_ids_message_received.blockchain_synopsis.begin(),
2161  fetch_blockchain_item_ids_message_received.blockchain_synopsis.end(),
2162  reply_message.item_hashes_available.back() ) != fetch_blockchain_item_ids_message_received.blockchain_synopsis.end() )
2163  {
2164  /* the last item in the peer's list matches the last item in our list */
2165  originating_peer->peer_needs_sync_items_from_us = false;
2166  if (originating_peer->inhibit_fetching_sync_blocks)
2167  disconnect_from_inhibited_peer = true; // delay disconnecting until after we send our reply to this fetch_blockchain_item_ids_message
2168  }
2169  else
2170  originating_peer->peer_needs_sync_items_from_us = true;
2171 
2172  if (!originating_peer->peer_needs_sync_items_from_us)
2173  {
2174  dlog("sync: peer is already in sync with us");
2175  // if we thought we had all the items this peer had, but now it turns out that we don't
2176  // have the last item it requested to send from,
2177  // we need to kick off another round of synchronization
2178  if (!originating_peer->we_need_sync_items_from_peer &&
2179  !fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty() &&
2180  !_delegate->has_item(peers_last_item_seen))
2181  {
2182  dlog("sync: restarting sync with peer ${peer}", ("peer", originating_peer->get_remote_endpoint()));
2183  start_synchronizing_with_peer(originating_peer->shared_from_this());
2184  }
2185  }
2186  else
2187  {
2188  dlog("sync: peer is out of sync, sending peer ${count} items ids: first: ${first_item_id}, last: ${last_item_id}",
2189  ("count", reply_message.item_hashes_available.size())
2190  ("first_item_id", reply_message.item_hashes_available.front())
2191  ("last_item_id", reply_message.item_hashes_available.back()));
2192  if (!originating_peer->we_need_sync_items_from_peer &&
2193  !fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty() &&
2194  !_delegate->has_item(peers_last_item_seen))
2195  {
2196  dlog("sync: restarting sync with peer ${peer}", ("peer", originating_peer->get_remote_endpoint()));
2197  start_synchronizing_with_peer(originating_peer->shared_from_this());
2198  }
2199  }
2200  originating_peer->send_message(reply_message);
2201 
2202  if (disconnect_from_inhibited_peer)
2203  {
2204  // the peer has all of our blocks, and we don't want any of theirs, so disconnect them
2205  disconnect_from_peer(originating_peer, "you are on a fork that I'm unable to switch to");
2206  return;
2207  }
2208 
2209  // Why only for inbound connections?
2210  if (originating_peer->direction == peer_connection_direction::inbound &&
2211  _handshaking_connections.find(originating_peer->shared_from_this()) != _handshaking_connections.end())
2212  {
2213  // handshaking is done, move the connection to fully active status and start synchronizing
2214  dlog("peer ${endpoint} which was handshaking with us has started synchronizing with us, "
2215  "start syncing with it",
2216  ("endpoint", originating_peer->get_remote_endpoint()));
2217 
2218  // Note: there was some code here to update the peer database, similar to the code in on_address_message(),
2219  // but this is an inbound connection,
2220  // updating last_connection_disposition to last_connection_succeeded doesn't seem correct,
2221  // so the code was removed.
2222 
2223  // transition it to our active list
2225  move_peer_to_active_list(originating_peer->shared_from_this());
2226  new_peer_just_added(originating_peer->shared_from_this());
2227  }
2228  }
2229 
2231  {
2233  uint32_t max_number_of_unfetched_items = 0;
2235  for( const peer_connection_ptr& peer : _active_connections )
2236  {
2237  uint32_t this_peer_unfetched_items_count = (uint32_t)peer->ids_of_items_to_get.size()
2238  + peer->number_of_unfetched_item_ids;
2239  max_number_of_unfetched_items = std::max(max_number_of_unfetched_items,
2240  this_peer_unfetched_items_count);
2241  }
2242  return max_number_of_unfetched_items;
2243  }
2244 
2245  // get a blockchain synopsis that makes sense to send to the given peer.
2246  // If the peer isn't yet syncing with us, this is just a synopsis of our active blockchain
2247  // If the peer is syncing with us, it is a synopsis of our active blockchain plus the
2248  // blocks the peer has already told us it has
2249  std::vector<item_hash_t> node_impl::create_blockchain_synopsis_for_peer( const peer_connection* peer )
2250  {
2252  item_hash_t reference_point = peer->last_block_delegate_has_seen;
2253 
2254  // when we call _delegate->get_blockchain_synopsis(), we may yield and there's a
2255  // chance this peer's state will change before we get control back. Save off
2256  // the stuff necessary for generating the synopsis.
2257  // This is pretty expensive, we should find a better way to do this
2258  std::vector<item_hash_t> original_ids_of_items_to_get(peer->ids_of_items_to_get.begin(),
2259  peer->ids_of_items_to_get.end());
2260  uint32_t number_of_blocks_after_reference_point = original_ids_of_items_to_get.size();
2261 
2262  std::vector<item_hash_t> synopsis = _delegate->get_blockchain_synopsis(reference_point,
2263  number_of_blocks_after_reference_point);
2264 
2265 #if 0
2266  // just for debugging, enable this and set a breakpoint to step through
2267  if (synopsis.empty())
2268  synopsis = _delegate->get_blockchain_synopsis(reference_point, number_of_blocks_after_reference_point);
2269 
2270  // TODO: it's possible that the returned synopsis is empty if the blockchain is empty (that's fine)
2271  // or if the reference point is now past our undo history (that's not).
2272  // in the second case, we should mark this peer as one we're unable to sync with and
2273  // disconnect them.
2274  if (reference_point != item_hash_t() && synopsis.empty())
2275  FC_THROW_EXCEPTION(block_older_than_undo_history, "You are on a fork I'm unable to switch to");
2276 #endif
2277 
2278  if( number_of_blocks_after_reference_point )
2279  {
2280  // then the synopsis is incomplete, add the missing elements from ids_of_items_to_get
2281  uint32_t first_block_num_in_ids_to_get = _delegate->get_block_number(original_ids_of_items_to_get.front());
2282  uint32_t true_high_block_num = first_block_num_in_ids_to_get + original_ids_of_items_to_get.size() - 1;
2283 
2284  // in order to generate a seamless synopsis, we need to be using the same low_block_num as the
2285  // backend code; the first block in the synopsis will be the low block number it used
2286  uint32_t low_block_num = synopsis.empty() ? 1 : _delegate->get_block_number(synopsis.front());
2287 
2288  do
2289  {
2290  if( low_block_num >= first_block_num_in_ids_to_get )
2291  synopsis.push_back(original_ids_of_items_to_get[low_block_num - first_block_num_in_ids_to_get]);
2292  low_block_num += (true_high_block_num - low_block_num + 2 ) / 2;
2293  }
2294  while ( low_block_num <= true_high_block_num );
2295  assert(synopsis.back() == original_ids_of_items_to_get.back());
2296  }
2297  return synopsis;
2298  }
2299 
2300  void node_impl::fetch_next_batch_of_item_ids_from_peer( peer_connection* peer, bool reset_fork_tracking_data_for_peer /* = false */ )
2301  {
2303  if( reset_fork_tracking_data_for_peer )
2304  {
2306  peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hash_t());
2307  }
2308 
2309  fc::oexception synopsis_exception;
2310  try
2311  {
2312  std::vector<item_hash_t> blockchain_synopsis = create_blockchain_synopsis_for_peer( peer );
2313 
2314  item_hash_t last_item_seen = blockchain_synopsis.empty() ? item_hash_t() : blockchain_synopsis.back();
2315  dlog( "sync: sending a request for the next items after ${last_item_seen} to peer ${peer}, "
2316  "(full request is ${blockchain_synopsis})",
2317  ( "last_item_seen", last_item_seen )
2318  ( "peer", peer->get_remote_endpoint() )
2319  ( "blockchain_synopsis", blockchain_synopsis ) );
2320  peer->item_ids_requested_from_peer = boost::make_tuple( blockchain_synopsis, fc::time_point::now() );
2321  peer->send_message( fetch_blockchain_item_ids_message(_sync_item_type, blockchain_synopsis ) );
2322  }
2323  catch (const block_older_than_undo_history& e)
2324  {
2325  synopsis_exception = e;
2326  }
2327  if (synopsis_exception)
2328  disconnect_from_peer(peer, "You are on a fork I'm unable to switch to");
2329  }
2330 
2332  const blockchain_item_ids_inventory_message& blockchain_item_ids_inventory_message_received )
2333  {
2335  // ignore unless we asked for the data
2336  if( originating_peer->item_ids_requested_from_peer )
2337  {
2338  // verify that the peer's the block ids the peer sent is a valid response to our request;
2339  // It should either be an empty list of blocks, or a list of blocks that builds off of one of
2340  // the blocks in the synopsis we sent
2341  if (!blockchain_item_ids_inventory_message_received.item_hashes_available.empty())
2342  {
2343  // what's more, it should be a sequential list of blocks, verify that first
2344  uint32_t first_block_number_in_reponse = _delegate->get_block_number(
2345  blockchain_item_ids_inventory_message_received.item_hashes_available.front());
2346  // explicitly convert the size into 32 bit, should be OK
2347  auto total_items = uint32_t( blockchain_item_ids_inventory_message_received.item_hashes_available.size() );
2348  for (uint32_t i = 1; i < total_items; ++i)
2349  {
2350  uint32_t actual_num = _delegate->get_block_number(
2351  blockchain_item_ids_inventory_message_received.item_hashes_available[i]);
2352  uint32_t expected_num = first_block_number_in_reponse + i;
2353  if (actual_num != expected_num)
2354  {
2355  wlog("Invalid response from peer ${peer_endpoint}. The list of blocks they provided is not sequential, "
2356  "the ${position}th block in their reply was block number ${actual_num}, "
2357  "but it should have been number ${expected_num}",
2358  ("peer_endpoint", originating_peer->get_remote_endpoint())
2359  ("position", i)
2360  ("actual_num", actual_num)
2361  ("expected_num", expected_num));
2362  fc::exception error_for_peer(FC_LOG_MESSAGE(error,
2363  "You gave an invalid response to my request for sync blocks. The list of blocks you provided is not sequential, "
2364  "the ${position}th block in their reply was block number ${actual_num}, "
2365  "but it should have been number ${expected_num}",
2366  ("position", i)
2367  ("actual_num", actual_num)
2368  ("expected_num", expected_num)));
2369  disconnect_from_peer(originating_peer,
2370  "You gave an invalid response to my request for sync blocks",
2371  true, error_for_peer);
2372  return;
2373  }
2374  }
2375 
2376  const std::vector<item_hash_t>& synopsis_sent_in_request = originating_peer->item_ids_requested_from_peer->get<0>();
2377  const item_hash_t& first_item_hash = blockchain_item_ids_inventory_message_received.item_hashes_available.front();
2378 
2379  if (synopsis_sent_in_request.empty())
2380  {
2381  // if we sent an empty synopsis, we were asking for all blocks, so the first block should be block 1
2382  if (_delegate->get_block_number(first_item_hash) != 1)
2383  {
2384  wlog("Invalid response from peer ${peer_endpoint}. We requested a list of sync blocks starting from the beginning of the chain, "
2385  "but they provided a list of blocks starting with ${first_block}",
2386  ("peer_endpoint", originating_peer->get_remote_endpoint())
2387  ("first_block", first_item_hash));
2388  fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You gave an invalid response for my request for sync blocks. I asked for blocks starting from the beginning of the chain, "
2389  "but you returned a list of blocks starting with ${first_block}",
2390  ("first_block", first_item_hash)));
2391  disconnect_from_peer(originating_peer,
2392  "You gave an invalid response to my request for sync blocks",
2393  true, error_for_peer);
2394  return;
2395  }
2396  }
2397  else // synopsis was not empty, we expect a response building off one of the blocks we sent
2398  {
2399  if (boost::range::find(synopsis_sent_in_request, first_item_hash) == synopsis_sent_in_request.end())
2400  {
2401  wlog("Invalid response from peer ${peer_endpoint}. We requested a list of sync blocks based on the synopsis ${synopsis}, but they "
2402  "provided a list of blocks starting with ${first_block}",
2403  ("peer_endpoint", originating_peer->get_remote_endpoint())
2404  ("synopsis", synopsis_sent_in_request)
2405  ("first_block", first_item_hash));
2406  fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You gave an invalid response for my request for sync blocks. I asked for blocks following something in "
2407  "${synopsis}, but you returned a list of blocks starting with ${first_block} which wasn't one of your choices",
2408  ("synopsis", synopsis_sent_in_request)
2409  ("first_block", first_item_hash)));
2410  disconnect_from_peer(originating_peer,
2411  "You gave an invalid response to my request for sync blocks",
2412  true, error_for_peer);
2413  return;
2414  }
2415  }
2416  }
2417  originating_peer->item_ids_requested_from_peer.reset();
2418 
2419  // if exceptions are throw after clearing the item_ids_requested_from_peer (above),
2420  // it could leave our sync in a stalled state. Wrap a try/catch around the rest
2421  // of the function so we can log if this ever happens.
2422  try
2423  {
2424  dlog( "sync: received a list of ${count} available items from ${peer_endpoint}",
2425  ( "count", blockchain_item_ids_inventory_message_received.item_hashes_available.size() )
2426  ( "peer_endpoint", originating_peer->get_remote_endpoint() ) );
2427  //for( const item_hash_t& item_hash : blockchain_item_ids_inventory_message_received.item_hashes_available )
2428  //{
2429  // dlog( "sync: ${hash}", ("hash", item_hash ) );
2430  //}
2431 
2432  // if the peer doesn't have any items after the one we asked for
2433  if( blockchain_item_ids_inventory_message_received.total_remaining_item_count == 0 &&
2434  ( blockchain_item_ids_inventory_message_received.item_hashes_available.empty() || // there are no items in the peer's blockchain. this should only happen if our blockchain was empty when we requested, might want to verify that.
2435  ( blockchain_item_ids_inventory_message_received.item_hashes_available.size() == 1 &&
2436  _delegate->has_item( item_id(blockchain_item_ids_inventory_message_received.item_type,
2437  blockchain_item_ids_inventory_message_received.item_hashes_available.front() ) ) ) ) && // we've already seen the last item in the peer's blockchain
2438  originating_peer->ids_of_items_to_get.empty() &&
2439  originating_peer->number_of_unfetched_item_ids == 0 ) // <-- is the last check necessary?
2440  {
2441  dlog( "sync: peer said we're up-to-date, entering normal operation with this peer" );
2442  originating_peer->we_need_sync_items_from_peer = false;
2443 
2444  uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers();
2445  _total_num_of_unfetched_items = new_number_of_unfetched_items;
2446  if( new_number_of_unfetched_items == 0 )
2447  _delegate->sync_status( blockchain_item_ids_inventory_message_received.item_type, 0 );
2448 
2449  return;
2450  }
2451 
2452  std::deque<item_hash_t> item_hashes_received( blockchain_item_ids_inventory_message_received.item_hashes_available.begin(),
2453  blockchain_item_ids_inventory_message_received.item_hashes_available.end() );
2454  originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count;
2455  // flush any items this peer sent us that we've already received and processed from another peer
2456  if (!item_hashes_received.empty() &&
2457  originating_peer->ids_of_items_to_get.empty())
2458  {
2459  bool is_first_item_for_other_peer = false;
2460  {
2462  for (const peer_connection_ptr& peer : _active_connections)
2463  {
2464  if (peer != originating_peer->shared_from_this() &&
2465  !peer->ids_of_items_to_get.empty() &&
2466  peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front())
2467  {
2468  dlog("The item ${newitem} is the first item for peer ${peer}",
2469  ("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front())
2470  ("peer", peer->get_remote_endpoint()));
2471  is_first_item_for_other_peer = true;
2472  break;
2473  }
2474  }
2475  }
2476  dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}",
2477  ("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size()));
2478  if (!is_first_item_for_other_peer)
2479  {
2480  while (!item_hashes_received.empty() &&
2481  _delegate->has_item(item_id(blockchain_item_ids_inventory_message_received.item_type,
2482  item_hashes_received.front())))
2483  {
2484  assert(item_hashes_received.front() != item_hash_t());
2485  originating_peer->last_block_delegate_has_seen = item_hashes_received.front();
2486  originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front());
2487  dlog("popping item because delegate has already seen it. peer ${peer}'s last block the delegate has seen is now ${block_id} (actual block #${actual_block_num})",
2488  ("peer", originating_peer->get_remote_endpoint())
2489  ("block_id", originating_peer->last_block_delegate_has_seen)
2490  ("actual_block_num", _delegate->get_block_number(item_hashes_received.front())));
2491 
2492  item_hashes_received.pop_front();
2493  }
2494  dlog("after removing all items we have already seen, item_hashes_received.size() = ${size}", ("size", item_hashes_received.size()));
2495  }
2496  }
2497  else if (!item_hashes_received.empty())
2498  {
2499  // we received a list of items and we already have a list of items to fetch from this peer.
2500  // In the normal case, this list will immediately follow the existing list, meaning the
2501  // last hash of our existing list will match the first hash of the new list.
2502 
2503  // In the much less likely case, we've received a partial list of items from the peer, then
2504  // the peer switched forks before sending us the remaining list. In this case, the first
2505  // hash in the new list may not be the last hash in the existing list (it may be earlier, or
2506  // it may not exist at all.
2507 
2508  // In either case, pop items off the back of our existing list until we find our first
2509  // item, then append our list.
2510  while (!originating_peer->ids_of_items_to_get.empty())
2511  {
2512  if (item_hashes_received.front() != originating_peer->ids_of_items_to_get.back())
2513  originating_peer->ids_of_items_to_get.pop_back();
2514  else
2515  break;
2516  }
2517  if (originating_peer->ids_of_items_to_get.empty())
2518  {
2519  // this happens when the peer has switched forks between the last inventory message and
2520  // this one, and there weren't any unfetched items in common
2521  // We don't know where in the blockchain the new front() actually falls, all we can
2522  // expect is that it is a block that we knew about because it should be one of the
2523  // blocks we sent in the initial synopsis.
2524  assert(_delegate->has_item(item_id(_sync_item_type, item_hashes_received.front())));
2525  originating_peer->last_block_delegate_has_seen = item_hashes_received.front();
2526  originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front());
2527  item_hashes_received.pop_front();
2528  }
2529  else
2530  {
2531  // the common simple case: the new list extends the old. pop off the duplicate element
2532  originating_peer->ids_of_items_to_get.pop_back();
2533  }
2534  }
2535 
2536  if (!item_hashes_received.empty() && !originating_peer->ids_of_items_to_get.empty())
2537  assert(item_hashes_received.front() != originating_peer->ids_of_items_to_get.back());
2538 
2539  // at any given time, there's a maximum number of blocks that can possibly be out there
2540  // [(now - genesis time) / block interval]. If they offer us more blocks than that,
2541  // they must be an attacker or have a buggy client.
2542  fc::time_point_sec minimum_time_of_last_offered_block =
2543  originating_peer->last_block_time_delegate_has_seen + // timestamp of the block immediately before the first unfetched block
2546  if (minimum_time_of_last_offered_block > (now + GRAPHENE_NET_FUTURE_SYNC_BLOCKS_GRACE_PERIOD_SEC))
2547  {
2548  wlog("Disconnecting from peer ${peer} who offered us an implausible number of blocks, their last block would be in the future (${timestamp})",
2549  ("peer", originating_peer->get_remote_endpoint())
2550  ("timestamp", minimum_time_of_last_offered_block));
2551  fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You offered me a list of more sync blocks than could possibly exist. Total blocks offered: ${blocks}, Minimum time of the last block you offered: ${minimum_time_of_last_offered_block}, Now: ${now}",
2552  ("blocks", originating_peer->number_of_unfetched_item_ids)
2553  ("minimum_time_of_last_offered_block", minimum_time_of_last_offered_block)
2554  ("now", now)));
2555  disconnect_from_peer(originating_peer,
2556  "You offered me a list of more sync blocks than could possibly exist",
2557  true, error_for_peer);
2558  return;
2559  }
2560 
2561  // append the remaining items to the peer's list
2562  boost::push_back(originating_peer->ids_of_items_to_get, item_hashes_received);
2563 
2564  uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers();
2565  if (new_number_of_unfetched_items != _total_num_of_unfetched_items)
2566  _delegate->sync_status(blockchain_item_ids_inventory_message_received.item_type,
2567  new_number_of_unfetched_items);
2568  _total_num_of_unfetched_items = new_number_of_unfetched_items;
2569 
2570  if (blockchain_item_ids_inventory_message_received.total_remaining_item_count != 0)
2571  {
2572  // the peer hasn't sent us all the items it knows about.
2573  if (originating_peer->ids_of_items_to_get.size() > GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH)
2574  {
2575  // we have a good number of item ids from this peer, start fetching blocks from it;
2576  // we'll switch back later to finish the job.
2578  }
2579  else
2580  {
2581  // keep fetching the peer's list of sync items until we get enough to switch into block-
2582  // fetchimg mode
2583  fetch_next_batch_of_item_ids_from_peer(originating_peer);
2584  }
2585  }
2586  else
2587  {
2588  // the peer has told us about all of the items it knows
2589  if (!originating_peer->ids_of_items_to_get.empty())
2590  {
2591  // we now know about all of the items the peer knows about, and there are some items on the list
2592  // that we should try to fetch. Kick off the fetch loop.
2594  }
2595  else
2596  {
2597  // If we get here, the peer has sent us a non-empty list of items, but we have already
2598  // received all of the items from other peers. Send a new request to the peer to
2599  // see if we're really in sync
2600  fetch_next_batch_of_item_ids_from_peer(originating_peer);
2601  }
2602  }
2603  }
2604  catch (const fc::canceled_exception&)
2605  {
2606  throw;
2607  }
2608  catch (const fc::exception& e)
2609  {
2610  elog("Caught unexpected exception: ${e}", ("e", e));
2611  assert(false && "exceptions not expected here");
2612  }
2613  catch (const std::exception& e)
2614  {
2615  elog("Caught unexpected exception: ${e}", ("e", e.what()));
2616  assert(false && "exceptions not expected here");
2617  }
2618  catch (...)
2619  {
2620  elog("Caught unexpected exception, could break sync operation");
2621  }
2622  }
2623  else
2624  {
2625  wlog( "sync: received a list of sync items available from peer ${peer}, but I didn't ask for any!",
2626  ("peer", originating_peer->get_remote_endpoint()) );
2627  }
2628  }
2629 
2631  {
2632  try
2633  {
2634  return _message_cache.get_message(item.item_hash);
2635  }
2636  catch (fc::key_not_found_exception&)
2637  {}
2638  try
2639  {
2640  return _delegate->get_item(item);
2641  }
2642  catch (fc::key_not_found_exception&)
2643  {}
2644  return item_not_available_message(item);
2645  }
2646 
2648  const fetch_items_message& fetch_items_message_received)
2649  {
2651  // Gatekeeping code
2653  {
2654  wlog( "Unexpected fetch_items_message from peer ${peer}, disconnecting",
2655  ("peer", originating_peer->get_remote_endpoint()) );
2656  disconnect_from_peer( originating_peer, "Received an unexpected fetch_items_message" );
2657  return;
2658  }
2659 
2660  dlog("received items request for ids ${ids} of type ${type} from peer ${endpoint}",
2661  ("ids", fetch_items_message_received.items_to_fetch)
2662  ("type", fetch_items_message_received.item_type)
2663  ("endpoint", originating_peer->get_remote_endpoint()));
2664 
2665  fc::optional<message> last_block_message_sent;
2666 
2667  std::list<message> reply_messages;
2668  for (const item_hash_t& item_hash : fetch_items_message_received.items_to_fetch)
2669  {
2670  try
2671  {
2672  message requested_message = _message_cache.get_message(item_hash);
2673  dlog("received item request for item ${id} from peer ${endpoint}, returning the item from my message cache",
2674  ("endpoint", originating_peer->get_remote_endpoint())
2675  ("id", requested_message.id()));
2676  reply_messages.push_back(requested_message);
2677  if (fetch_items_message_received.item_type == block_message_type)
2678  last_block_message_sent = requested_message;
2679  continue;
2680  }
2681  catch (fc::key_not_found_exception&)
2682  {
2683  // it wasn't in our local cache, that's ok ask the client
2684  }
2685 
2686  item_id item_to_fetch(fetch_items_message_received.item_type, item_hash);
2687  try
2688  {
2689  message requested_message = _delegate->get_item(item_to_fetch);
2690  dlog("received item request from peer ${endpoint}, returning the item from delegate with id ${id} size ${size}",
2691  ("id", requested_message.id())
2692  ("size", requested_message.size)
2693  ("endpoint", originating_peer->get_remote_endpoint()));
2694  reply_messages.push_back(requested_message);
2695  if (fetch_items_message_received.item_type == block_message_type)
2696  last_block_message_sent = requested_message;
2697  continue;
2698  }
2699  catch (fc::key_not_found_exception&)
2700  {
2701  reply_messages.push_back(item_not_available_message(item_to_fetch));
2702  dlog("received item request from peer ${endpoint} but we don't have it",
2703  ("endpoint", originating_peer->get_remote_endpoint()));
2704  }
2705  }
2706 
2707  // if we sent them a block, update our record of the last block they've seen accordingly
2708  if (last_block_message_sent)
2709  {
2710  graphene::net::block_message block = last_block_message_sent->as<graphene::net::block_message>();
2711  originating_peer->last_block_delegate_has_seen = block.block_id;
2712  originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(block.block_id);
2713  }
2714 
2715  for (const message& reply : reply_messages)
2716  {
2717  if (reply.msg_type.value() == block_message_type)
2719  else
2720  originating_peer->send_message(reply);
2721  }
2722  }
2723 
2724  void node_impl::on_item_not_available_message( peer_connection* originating_peer, const item_not_available_message& item_not_available_message_received )
2725  {
2727  const item_id& requested_item = item_not_available_message_received.requested_item;
2728  auto regular_item_iter = originating_peer->items_requested_from_peer.find(requested_item);
2729  if (regular_item_iter != originating_peer->items_requested_from_peer.end())
2730  {
2731  originating_peer->items_requested_from_peer.erase( regular_item_iter );
2732  originating_peer->inventory_peer_advertised_to_us.erase( requested_item );
2733  if (is_item_in_any_peers_inventory(requested_item))
2734  {
2737  }
2738  wlog( "Peer ${peer} doesn't have the requested item ${item}.",
2739  ("peer", originating_peer->get_remote_endpoint())
2740  ("item", requested_item) );
2742  return;
2743  }
2744 
2745  auto sync_item_iter = originating_peer->sync_items_requested_from_peer.find(requested_item.item_hash);
2746  if (sync_item_iter != originating_peer->sync_items_requested_from_peer.end())
2747  {
2748  _active_sync_requests.erase(*sync_item_iter);
2749  originating_peer->sync_items_requested_from_peer.erase(sync_item_iter);
2750 
2751  if (originating_peer->peer_needs_sync_items_from_us)
2752  originating_peer->inhibit_fetching_sync_blocks = true;
2753  else
2754  disconnect_from_peer(originating_peer, "You are missing a sync item you claim to have, your database is probably corrupted. Try --rebuild-index.",true,
2755  fc::exception(FC_LOG_MESSAGE(error,"You are missing a sync item you claim to have, your database is probably corrupted. Try --rebuild-index.",
2756  ("item_id", requested_item))));
2757  wlog( "Peer ${peer} doesn't have the requested sync item ${item}. This really shouldn't happen",
2758  ("peer", originating_peer->get_remote_endpoint())
2759  ("item", requested_item) );
2761  return;
2762  }
2763 
2764  dlog("Peer doesn't have an item we're looking for, which is fine because we weren't looking for it");
2765  }
2766 
2767  void node_impl::on_item_ids_inventory_message(peer_connection* originating_peer, const item_ids_inventory_message& item_ids_inventory_message_received)
2768  {
2770  // Gatekeeping code
2772  {
2773  wlog( "Unexpected item_ids_inventory_message from peer ${peer}, disconnecting",
2774  ("peer", originating_peer->get_remote_endpoint()) );
2775  disconnect_from_peer( originating_peer, "Received an unexpected item_ids_inventory_message" );
2776  return;
2777  }
2778 
2779  // expire old inventory
2780  // so we'll be making our decisions about whether to fetch blocks below based only on recent inventory
2781  originating_peer->clear_old_inventory();
2782 
2783  dlog( "received inventory of ${count} items from peer ${endpoint}",
2784  ("count", item_ids_inventory_message_received.item_hashes_available.size())
2785  ("endpoint", originating_peer->get_remote_endpoint() ) );
2786  for( const item_hash_t& item_hash : item_ids_inventory_message_received.item_hashes_available )
2787  {
2788  item_id advertised_item_id(item_ids_inventory_message_received.item_type, item_hash);
2789  bool we_advertised_this_item_to_a_peer = false;
2790  bool we_requested_this_item_from_a_peer = false;
2791  {
2793  for (const peer_connection_ptr& peer : _active_connections)
2794  {
2795  if (peer->inventory_advertised_to_peer.find(advertised_item_id) != peer->inventory_advertised_to_peer.end())
2796  {
2797  we_advertised_this_item_to_a_peer = true;
2798  break;
2799  }
2800  if (peer->items_requested_from_peer.find(advertised_item_id) != peer->items_requested_from_peer.end())
2801  we_requested_this_item_from_a_peer = true;
2802  }
2803  }
2804 
2805  // if we have already advertised it to a peer, we must have it, no need to do anything else
2806  if (!we_advertised_this_item_to_a_peer)
2807  {
2808  // if the peer has flooded us with transactions, don't add these to the inventory to prevent our
2809  // inventory list from growing without bound. We try to allow fetching blocks even when
2810  // we've stopped fetching transactions.
2811  if ((item_ids_inventory_message_received.item_type == graphene::net::trx_message_type &&
2813  originating_peer->is_inventory_advertised_to_us_list_full())
2814  break;
2815  originating_peer->inventory_peer_advertised_to_us.insert(peer_connection::timestamped_item_id(advertised_item_id, fc::time_point::now()));
2816  if (!we_requested_this_item_from_a_peer)
2817  {
2818  if (_recently_failed_items.find(item_id(item_ids_inventory_message_received.item_type, item_hash)) != _recently_failed_items.end())
2819  {
2820  dlog("not adding ${item_hash} to our list of items to fetch because we've recently fetched a copy and it failed to push",
2821  ("item_hash", item_hash));
2822  }
2823  else
2824  {
2825  auto items_to_fetch_iter = _items_to_fetch.get<item_id_index>().find(advertised_item_id);
2826  if (items_to_fetch_iter == _items_to_fetch.get<item_id_index>().end())
2827  {
2828  // it's new to us
2831  dlog("adding item ${item_hash} from inventory message to our list of items to fetch",
2832  ("item_hash", item_hash));
2834  }
2835  else
2836  {
2837  // another peer has told us about this item already, but this peer just told us it has the item
2838  // too, we can expect it to be around in this peer's cache for longer, so update its timestamp
2839  _items_to_fetch.get<item_id_index>().modify(items_to_fetch_iter,
2840  [](prioritized_item_id& item) { item.timestamp = fc::time_point::now(); });
2841  }
2842  }
2843  }
2844  }
2845  }
2846 
2847  }
2848 
2850  const closing_connection_message& closing_connection_message_received )
2851  {
2853  originating_peer->they_have_requested_close = true;
2854 
2855  if( closing_connection_message_received.closing_due_to_error )
2856  {
2857  wlog( "Peer ${peer} is disconnecting us because of an error: ${msg}, exception: ${error}",
2858  ( "peer", originating_peer->get_remote_endpoint() )
2859  ( "msg", closing_connection_message_received.reason_for_closing )
2860  ( "error", closing_connection_message_received.error ) );
2861  std::ostringstream message;
2862  message << "Peer " << fc::variant( originating_peer->get_remote_endpoint(),
2864  " disconnected us: " << closing_connection_message_received.reason_for_closing;
2865  fc::exception detailed_error(FC_LOG_MESSAGE(warn,
2866  "Peer ${peer} is disconnecting us because of an error: ${msg}, exception: ${error}",
2867  ( "peer", originating_peer->get_remote_endpoint() )
2868  ( "msg", closing_connection_message_received.reason_for_closing )
2869  ( "error", closing_connection_message_received.error ) ));
2870  _delegate->error_encountered( message.str(),
2871  detailed_error );
2872  }
2873  else
2874  {
2875  wlog( "Peer ${peer} is disconnecting us because: ${msg}",
2876  ( "peer", originating_peer->get_remote_endpoint() )
2877  ( "msg", closing_connection_message_received.reason_for_closing ) );
2878  }
2879  if( originating_peer->we_have_requested_close )
2880  originating_peer->close_connection();
2881  }
2882 
2884  {
2886  peer_connection_ptr originating_peer_ptr = originating_peer->shared_from_this();
2887  _rate_limiter.remove_tcp_socket( &originating_peer->get_socket() );
2888 
2889  // if we closed the connection (due to timeout or handshake failure), we should have recorded an
2890  // error message to store in the peer database when we closed the connection
2891  fc::optional<fc::ip::endpoint> inbound_endpoint = originating_peer->get_endpoint_for_connecting();
2892  if( originating_peer->connection_closed_error
2893  && inbound_endpoint.valid() && inbound_endpoint->port() != 0 )
2894  {
2895  fc::optional<potential_peer_record> updated_peer_record
2896  = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint);
2897  if (updated_peer_record)
2898  {
2899  updated_peer_record->last_error = *originating_peer->connection_closed_error;
2900  _potential_peer_db.update_entry(*updated_peer_record);
2901  }
2902  }
2903 
2904  _closing_connections.erase(originating_peer_ptr);
2905  _handshaking_connections.erase(originating_peer_ptr);
2906  _terminating_connections.erase(originating_peer_ptr);
2907  if (_active_connections.find(originating_peer_ptr) != _active_connections.end())
2908  {
2909  _active_connections.erase(originating_peer_ptr);
2910 
2911  update_address_seen_time( this, originating_peer );
2912  }
2913 
2914  ilog("Remote peer ${endpoint} closed their connection to us",
2915  ("endpoint", originating_peer->get_remote_endpoint()));
2918 
2919  // notify the node delegate so it can update the display
2921  {
2923  _delegate->connection_count_changed( _last_reported_number_of_conns );
2924  }
2925 
2926  // if we had requested any sync or regular items from this peer that we haven't
2927  // received yet, reschedule them to be fetched from another peer
2928  if (!originating_peer->sync_items_requested_from_peer.empty())
2929  {
2930  for (auto sync_item : originating_peer->sync_items_requested_from_peer)
2931  _active_sync_requests.erase(sync_item);
2933  }
2934 
2935  if (!originating_peer->items_requested_from_peer.empty())
2936  {
2937  for (auto item_and_time : originating_peer->items_requested_from_peer)
2938  {
2939  if (is_item_in_any_peers_inventory(item_and_time.first))
2940  {
2941  _items_to_fetch.insert(prioritized_item_id(item_and_time.first, _items_to_fetch_seq_counter));
2943  }
2944  }
2946  }
2947 
2948  schedule_peer_for_deletion(originating_peer_ptr);
2949  }
2950 
2952  {
2953  dlog("in send_sync_block_to_node_delegate()");
2954  bool client_accepted_block = false;
2955  bool discontinue_fetching_blocks_from_peer = false;
2956 
2957  fc::oexception handle_message_exception;
2958 
2959  try
2960  {
2961  std::vector<message_hash_type> contained_transaction_msg_ids;
2962  _delegate->handle_block(block_message_to_send, true, contained_transaction_msg_ids);
2963  dlog("Successfully pushed sync block ${num} (id:${id})",
2964  ("num", block_message_to_send.block.block_num())
2965  ("id", block_message_to_send.block_id));
2966  _most_recent_blocks_accepted.push_back(block_message_to_send.block_id);
2967 
2968  client_accepted_block = true;
2969  }
2970  catch (const block_older_than_undo_history& e)
2971  {
2972  wlog("Failed to push sync block ${num} (id:${id}): block is on a fork older than our undo history would "
2973  "allow us to switch to: ${e}",
2974  ("num", block_message_to_send.block.block_num())
2975  ("id", block_message_to_send.block_id)
2976  ("e", (fc::exception)e));
2977  handle_message_exception = e;
2978  discontinue_fetching_blocks_from_peer = true;
2979  }
2980  catch (const fc::canceled_exception&)
2981  {
2982  throw;
2983  }
2984  catch (const fc::exception& e)
2985  {
2986  auto block_num = block_message_to_send.block.block_num();
2987  wlog("Failed to push sync block ${num} (id:${id}): client rejected sync block sent by peer: ${e}",
2988  ("num", block_num)
2989  ("id", block_message_to_send.block_id)
2990  ("e", e));
2991  if( e.code() == block_timestamp_in_future_exception::code_enum::code_value )
2992  {
2993  handle_message_exception = block_timestamp_in_future_exception( FC_LOG_MESSAGE( warn, "",
2994  ("block_header", static_cast<graphene::protocol::block_header>(block_message_to_send.block))
2995  ("block_num", block_num)
2996  ("block_id", block_message_to_send.block_id) ) );
2997  }
2998  else
2999  handle_message_exception = e;
3000  }
3001 
3002  // build up lists for any potentially-blocking operations we need to do, then do them
3003  // at the end of this function
3004  std::set<peer_connection_ptr> peers_with_newly_empty_item_lists;
3005  std::set<peer_connection_ptr> peers_we_need_to_sync_to;
3006  std::map<peer_connection_ptr, std::pair<std::string, fc::oexception> > peers_to_disconnect; // map peer -> pair<reason_string, exception>
3007 
3008  if( client_accepted_block )
3009  {
3011  dlog("sync: client accpted the block, we now have only ${count} items left to fetch before we're in sync",
3012  ("count", _total_num_of_unfetched_items));
3013  bool is_fork_block = is_hard_fork_block(block_message_to_send.block.block_num());
3014  {
3016 
3017  for (const peer_connection_ptr& peer : _active_connections)
3018  {
3019  bool disconnecting_this_peer = false;
3020  if (is_fork_block)
3021  {
3022  // we just pushed a hard fork block. Find out if this peer is running a client
3023  // that will be unable to process future blocks
3024  if (peer->last_known_fork_block_number != 0)
3025  {
3026  uint32_t next_fork_block_number = get_next_known_hard_fork_block_number(peer->last_known_fork_block_number);
3027  if (next_fork_block_number != 0 &&
3028  next_fork_block_number <= block_message_to_send.block.block_num())
3029  {
3030  std::ostringstream disconnect_reason_stream;
3031  disconnect_reason_stream << "You need to upgrade your client due to hard fork at block " << block_message_to_send.block.block_num();
3032  peers_to_disconnect[peer] = std::make_pair(disconnect_reason_stream.str(),
3033  fc::oexception(fc::exception(FC_LOG_MESSAGE(error, "You need to upgrade your client due to hard fork at block ${block_number}",
3034  ("block_number", block_message_to_send.block.block_num())))));
3035 #ifdef ENABLE_DEBUG_ULOGS
3036  ulog("Disconnecting from peer during sync because their version is too old. Their version date: ${date}", ("date", peer->graphene_git_revision_unix_timestamp));
3037 #endif
3038  disconnecting_this_peer = true;
3039  }
3040  }
3041  }
3042  if (!disconnecting_this_peer &&
3043  peer->ids_of_items_to_get.empty() && peer->ids_of_items_being_processed.empty())
3044  {
3045  dlog( "Cannot pop first element off peer ${peer}'s list, its list is empty", ("peer", peer->get_remote_endpoint() ) );
3046  // we don't know for sure that this peer has the item we just received.
3047  // If peer is still syncing to us, we know they will ask us for
3048  // sync item ids at least one more time and we'll notify them about
3049  // the item then, so there's no need to do anything. If we still need items
3050  // from them, we'll be asking them for more items at some point, and
3051  // that will clue them in that they are out of sync. If we're fully in sync
3052  // we need to kick off another round of synchronization with them so they can
3053  // find out about the new item.
3054  if (!peer->peer_needs_sync_items_from_us && !peer->we_need_sync_items_from_peer)
3055  {
3056  dlog("We will be restarting synchronization with peer ${peer}", ("peer", peer->get_remote_endpoint()));
3057  peers_we_need_to_sync_to.insert(peer);
3058  }
3059  }
3060  else if (!disconnecting_this_peer)
3061  {
3062  auto items_being_processed_iter = peer->ids_of_items_being_processed.find(block_message_to_send.block_id);
3063  if (items_being_processed_iter != peer->ids_of_items_being_processed.end())
3064  {
3065  peer->last_block_delegate_has_seen = block_message_to_send.block_id;
3066  peer->last_block_time_delegate_has_seen = block_message_to_send.block.timestamp;
3067 
3068  peer->ids_of_items_being_processed.erase(items_being_processed_iter);
3069  dlog("Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks",
3070  ("endpoint", peer->get_remote_endpoint())("len", peer->ids_of_items_being_processed.size()));
3071 
3072  // if we just received the last item in our list from this peer, we will want to
3073  // send another request to find out if we are in sync, but we can't do this yet
3074  // (we don't want to allow a fiber swap in the middle of popping items off the list)
3075  if (peer->ids_of_items_to_get.empty() &&
3076  peer->number_of_unfetched_item_ids == 0 &&
3077  peer->ids_of_items_being_processed.empty())
3078  peers_with_newly_empty_item_lists.insert(peer);
3079 
3080  // in this case, we know the peer was offering us this exact item, no need to
3081  // try to inform them of its existence
3082  }
3083  }
3084  } // for
3085  } // lock_guard
3086  }
3087  else
3088  {
3089  // invalid message received
3091  for (const peer_connection_ptr& peer : _active_connections)
3092  {
3093  if (peer->ids_of_items_being_processed.find(block_message_to_send.block_id)
3094  != peer->ids_of_items_being_processed.end())
3095  {
3096  if (discontinue_fetching_blocks_from_peer)
3097  {
3098  wlog("inhibiting fetching sync blocks from peer ${endpoint} because it is on a fork that's too old",
3099  ("endpoint", peer->get_remote_endpoint()));
3100  peer->inhibit_fetching_sync_blocks = true;
3101  }
3102  else
3103  peers_to_disconnect[peer] = std::make_pair(
3104  std::string("You offered us a block that we reject as invalid"),
3105  fc::oexception(handle_message_exception));
3106  }
3107  }
3108  }
3109 
3110  for (auto& peer_to_disconnect : peers_to_disconnect)
3111  {
3112  const peer_connection_ptr& peer = peer_to_disconnect.first;
3113  std::string reason_string;
3114  fc::oexception reason_exception;
3115  std::tie(reason_string, reason_exception) = peer_to_disconnect.second;
3116  wlog("disconnecting client ${endpoint} because it offered us the rejected block",
3117  ("endpoint", peer->get_remote_endpoint()));
3118  disconnect_from_peer(peer.get(), reason_string, true, reason_exception);
3119  }
3120  for (const peer_connection_ptr& peer : peers_with_newly_empty_item_lists)
3122 
3123  for (const peer_connection_ptr& peer : peers_we_need_to_sync_to)
3125 
3126  dlog("Leaving send_sync_block_to_node_delegate");
3127 
3128  if (// _suspend_fetching_sync_blocks && <-- you can use this if
3129  // "max_blocks_to_handle_at_once" == "max_sync_blocks_to_prefetch"
3133  "process_backlog_of_sync_blocks");
3134  }
3135 
3137  {
3139  // garbage-collect the list of async tasks here for lack of a better place
3140  for (auto calls_iter = _handle_message_calls_in_progress.begin();
3141  calls_iter != _handle_message_calls_in_progress.end();)
3142  {
3143  if (calls_iter->ready())
3144  calls_iter = _handle_message_calls_in_progress.erase(calls_iter);
3145  else
3146  ++calls_iter;
3147  }
3148 
3149  dlog("in process_backlog_of_sync_blocks");
3151  {
3152  dlog("leaving process_backlog_of_sync_blocks because we're already processing too many blocks");
3153  return; // we will be rescheduled when the next block finishes its processing
3154  }
3155  dlog("currently ${count} blocks in the process of being handled", ("count", _handle_message_calls_in_progress.size()));
3156 
3157 
3159  {
3160  dlog("resuming processing sync block backlog because we only ${count} blocks in progress",
3161  ("count", _handle_message_calls_in_progress.size()));
3163  }
3164 
3165 
3166  // when syncing with multiple peers, it's possible that we'll have hundreds of blocks ready to push
3167  // to the client at once. This can be slow, and we need to limit the number we push at any given
3168  // time to allow network traffic to continue so we don't end up disconnecting from peers
3169  //fc::time_point start_time = fc::time_point::now();
3170  //fc::time_point when_we_should_yield = start_time + fc::seconds(1);
3171 
3172  bool block_processed_this_iteration;
3173  size_t blocks_processed = 0;
3174 
3175  std::set<peer_connection_ptr> peers_with_newly_empty_item_lists;
3176  std::set<peer_connection_ptr> peers_we_need_to_sync_to;
3177  std::map<peer_connection_ptr, fc::oexception> peers_with_rejected_block;
3178 
3179  do
3180  {
3181  std::copy(std::make_move_iterator(_new_received_sync_items.begin()),
3182  std::make_move_iterator(_new_received_sync_items.end()),
3183  std::front_inserter(_received_sync_items));
3184  _new_received_sync_items.clear();
3185  dlog("currently ${count} sync items to consider", ("count", _received_sync_items.size()));
3186 
3187  block_processed_this_iteration = false;
3188  for (auto received_block_iter = _received_sync_items.begin();
3189  received_block_iter != _received_sync_items.end();
3190  ++received_block_iter)
3191  {
3192 
3193  // find out if this block is the next block on the active chain or one of the forks
3194  bool potential_first_block = false;
3195  {
3197  for (const peer_connection_ptr& peer : _active_connections)
3198  {
3199  if (!peer->ids_of_items_to_get.empty() &&
3200  peer->ids_of_items_to_get.front() == received_block_iter->block_id)
3201  {
3202  potential_first_block = true;
3203  peer->ids_of_items_to_get.pop_front();
3204  peer->ids_of_items_being_processed.insert(received_block_iter->block_id);
3205  }
3206  }
3207  }
3208 
3209  // if it is, process it, remove it from all sync peers lists
3210  if (potential_first_block)
3211  {
3212  // we can get into an interesting situation near the end of synchronization. We can be in
3213  // sync with one peer who is sending us the last block on the chain via a regular inventory
3214  // message, while at the same time still be synchronizing with a peer who is sending us the
3215  // block through the sync mechanism. Further, we must request both blocks because
3216  // we don't know they're the same (for the peer in normal operation, it has only told us the
3217  // message id, for the peer in the sync case we only known the block_id).
3218  if (std::find(_most_recent_blocks_accepted.begin(), _most_recent_blocks_accepted.end(),
3219  received_block_iter->block_id) == _most_recent_blocks_accepted.end())
3220  {
3221  graphene::net::block_message block_message_to_process = *received_block_iter;
3222  _received_sync_items.erase(received_block_iter);
3223  _handle_message_calls_in_progress.emplace_back(fc::async([this, block_message_to_process](){
3224  send_sync_block_to_node_delegate(block_message_to_process);
3225  }, "send_sync_block_to_node_delegate"));
3226  ++blocks_processed;
3227  block_processed_this_iteration = true;
3228  }
3229  else
3230  {
3231  dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted");
3232  std::vector< peer_connection_ptr > peers_needing_next_batch;
3234  for (const peer_connection_ptr& peer : _active_connections)
3235  {
3236  auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id);
3237  if (items_being_processed_iter != peer->ids_of_items_being_processed.end())
3238  {
3239  peer->ids_of_items_being_processed.erase(items_being_processed_iter);
3240  dlog("Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks",
3241  ("endpoint", peer->get_remote_endpoint())("len", peer->ids_of_items_being_processed.size()));
3242 
3243  // if we just processed the last item in our list from this peer, we will want to
3244  // send another request to find out if we are now in sync (this is normally handled in
3245  // send_sync_block_to_node_delegate)
3246  if (peer->ids_of_items_to_get.empty() &&
3247  peer->number_of_unfetched_item_ids == 0 &&
3248  peer->ids_of_items_being_processed.empty())
3249  {
3250  dlog("We received last item in our list for peer ${endpoint}, setup to do a sync check", ("endpoint", peer->get_remote_endpoint()));
3251  peers_needing_next_batch.push_back( peer );
3252  }
3253  }
3254  }
3255  for( const peer_connection_ptr& peer : peers_needing_next_batch )
3257  }
3258 
3259  break; // start iterating _received_sync_items from the beginning
3260  } // end if potential_first_block
3261  } // end for each block in _received_sync_items
3262 
3264  {
3265  dlog("stopping processing sync block backlog because we have ${count} blocks in progress",
3266  ("count", _handle_message_calls_in_progress.size()));
3267  //ulog("stopping processing sync block backlog because we have ${count} blocks in progress, total on hand: ${received}",
3268  // ("count", _handle_message_calls_in_progress.size())("received", _received_sync_items.size()));
3271  break;
3272  }
3273  } while (block_processed_this_iteration);
3274 
3275  dlog("leaving process_backlog_of_sync_blocks, ${count} processed", ("count", blocks_processed));
3276 
3279  }
3280 
3282  {
3283  if (!_node_is_shutting_down &&
3286  "process_backlog_of_sync_blocks" );
3287  }
3288 
3290  const graphene::net::block_message& block_message_to_process,
3291  const message_hash_type& )
3292  {
3294  dlog( "received a sync block from peer ${endpoint}", ("endpoint", originating_peer->get_remote_endpoint() ) );
3295 
3296  // add it to the front of _received_sync_items, then process _received_sync_items to try to
3297  // pass as many messages as possible to the client.
3298  _new_received_sync_items.push_front( block_message_to_process );
3300  }
3301 
3303  const graphene::net::block_message& block_message_to_process,
3304  const message_hash_type& message_hash )
3305  {
3306  fc::time_point message_receive_time = fc::time_point::now();
3307 
3308  dlog( "received a block from peer ${endpoint}, passing it to client",
3309  ("endpoint", originating_peer->get_remote_endpoint() ) );
3310  std::set<peer_connection_ptr> peers_to_disconnect;
3311  std::string disconnect_reason;
3312  fc::oexception disconnect_exception;
3313  fc::oexception restart_sync_exception;
3314  try
3315  {
3316  // we can get into an intersting situation near the end of synchronization. We can be in
3317  // sync with one peer who is sending us the last block on the chain via a regular inventory
3318  // message, while at the same time still be synchronizing with a peer who is sending us the
3319  // block through the sync mechanism. Further, we must request both blocks because
3320  // we don't know they're the same (for the peer in normal operation, it has only told us the
3321  // message id, for the peer in the sync case we only known the block_id).
3322  fc::time_point message_validated_time;
3323  if (std::find(_most_recent_blocks_accepted.begin(), _most_recent_blocks_accepted.end(),
3324  block_message_to_process.block_id) == _most_recent_blocks_accepted.end())
3325  {
3326  std::vector<message_hash_type> contained_transaction_msg_ids;
3327  _delegate->handle_block(block_message_to_process, false, contained_transaction_msg_ids);
3328  message_validated_time = fc::time_point::now();
3329  dlog("Successfully pushed block ${num} (id:${id})",
3330  ("num", block_message_to_process.block.block_num())
3331  ("id", block_message_to_process.block_id));
3332  _most_recent_blocks_accepted.push_back(block_message_to_process.block_id);
3333 
3334  bool new_transaction_discovered = false;
3335  for (const item_hash_t& transaction_message_hash : contained_transaction_msg_ids)
3336  {
3337  /*size_t items_erased =*/
3338  _items_to_fetch.get<item_id_index>().erase(item_id(trx_message_type, transaction_message_hash));
3339  // there are two ways we could behave here: we could either act as if we received
3340  // the transaction outside the block and offer it to our peers, or we could just
3341  // forget about it (we would still advertise this block to our peers so they should
3342  // get the transaction through that mechanism).
3343  // We take the second approach, bring in the next if block to try the first approach
3344  //if (items_erased)
3345  //{
3346  // new_transaction_discovered = true;
3347  // _new_inventory.insert(item_id(trx_message_type, transaction_message_hash));
3348  //}
3349  }
3350  if (new_transaction_discovered)
3352  }
3353  else
3354  dlog( "Already received and accepted this block (presumably through sync mechanism), treating it as accepted" );
3355 
3356  dlog( "client validated the block, advertising it to other peers" );
3357 
3358  item_id block_message_item_id(core_message_type_enum::block_message_type, message_hash);
3359  uint32_t block_number = block_message_to_process.block.block_num();
3360  fc::time_point_sec block_time = block_message_to_process.block.timestamp;
3361  {
3363  for (const peer_connection_ptr& peer : _active_connections)
3364  {
3365  auto iter = peer->inventory_peer_advertised_to_us.find(block_message_item_id);
3366  if (iter != peer->inventory_peer_advertised_to_us.end())
3367  {
3368  // this peer offered us the item. It will eventually expire from the peer's
3369  // inventory_peer_advertised_to_us list after some time has passed (currently 2 minutes).
3370  // For now, it will remain there, which will prevent us from offering the peer this
3371  // block back when we rebroadcast the block below
3372  peer->last_block_delegate_has_seen = block_message_to_process.block_id;
3373  peer->last_block_time_delegate_has_seen = block_time;
3374  }
3375  peer->clear_old_inventory();
3376  }
3377  }
3378  message_propagation_data propagation_data { message_receive_time, message_validated_time,
3379  originating_peer->node_id };
3380  broadcast( block_message_to_process, propagation_data );
3382 
3383  if (is_hard_fork_block(block_number))
3384  {
3385  // we just pushed a hard fork block. Find out if any of our peers are running clients
3386  // that will be unable to process future blocks
3388  for (const peer_connection_ptr& peer : _active_connections)
3389  {
3390  if (peer->last_known_fork_block_number != 0)
3391  {
3392  uint32_t next_fork_block_number = get_next_known_hard_fork_block_number(peer->last_known_fork_block_number);
3393  if (next_fork_block_number != 0 &&
3394  next_fork_block_number <= block_number)
3395  {
3396  peers_to_disconnect.insert(peer);
3397 #ifdef ENABLE_DEBUG_ULOGS
3398  ulog("Disconnecting from peer because their version is too old. Their version date: ${date}", ("date", peer->graphene_git_revision_unix_timestamp));
3399 #endif
3400  }
3401  }
3402  }
3403  if (!peers_to_disconnect.empty())
3404  {
3405  std::ostringstream disconnect_reason_stream;
3406  disconnect_reason_stream << "You need to upgrade your client due to hard fork at block " << block_number;
3407  disconnect_reason = disconnect_reason_stream.str();
3408  disconnect_exception = fc::exception(FC_LOG_MESSAGE(error, "You need to upgrade your client due to hard fork at block ${block_number}",
3409  ("block_number", block_number)));
3410  }
3411  }
3412  }
3413  catch (const fc::canceled_exception&)
3414  {
3415  throw;
3416  }
3417  catch (const unlinkable_block_exception& e)
3418  {
3419  restart_sync_exception = e;
3420  }
3421  catch (const fc::exception& e)
3422  {
3423  // client rejected the block. Disconnect the client and any other clients that offered us this block
3424  auto block_num = block_message_to_process.block.block_num();
3425  wlog("Failed to push block ${num} (id:${id}), client rejected block sent by peer: ${e}",
3426  ("num", block_num)
3427  ("id", block_message_to_process.block_id)
3428  ("e",e));
3429 
3430  if( e.code() == block_timestamp_in_future_exception::code_enum::code_value )
3431  {
3432  disconnect_exception = block_timestamp_in_future_exception( FC_LOG_MESSAGE( warn, "",
3433  ("block_header", static_cast<graphene::protocol::block_header>(block_message_to_process.block))
3434  ("block_num", block_num)
3435  ("block_id", block_message_to_process.block_id) ) );
3436  }
3437  else
3438  disconnect_exception = e;
3439  disconnect_reason = "You offered me a block that I have deemed to be invalid";
3440 
3441  peers_to_disconnect.insert( originating_peer->shared_from_this() );
3443  for (const peer_connection_ptr& peer : _active_connections)
3444  if (!peer->ids_of_items_to_get.empty() && peer->ids_of_items_to_get.front() == block_message_to_process.block_id)
3445  peers_to_disconnect.insert(peer);
3446  }
3447 
3448  if (restart_sync_exception)
3449  {
3450  wlog("Peer ${peer} sent me a block that didn't link to our blockchain. Restarting sync mode with them to get the missing block. "
3451  "Error pushing block was: ${e}",
3452  ("peer", originating_peer->get_remote_endpoint())
3453  ("e", *restart_sync_exception));
3454  start_synchronizing_with_peer(originating_peer->shared_from_this());
3455  }
3456 
3457  for (const peer_connection_ptr& peer : peers_to_disconnect)
3458  {
3459  wlog("disconnecting client ${endpoint} because it offered us the rejected block",
3460  ("endpoint", peer->get_remote_endpoint()));
3461  disconnect_from_peer(peer.get(), disconnect_reason, true, *disconnect_exception);
3462  }
3463  }
3465  const message& message_to_process,
3466  const message_hash_type& message_hash)
3467  {
3469  // find out whether we requested this item while we were synchronizing or during normal operation
3470  // (it's possible that we request an item during normal operation and then get kicked into sync
3471  // mode before we receive and process the item. In that case, we should process the item as a normal
3472  // item to avoid confusing the sync code)
3473  graphene::net::block_message block_message_to_process(message_to_process.as<graphene::net::block_message>());
3474  auto item_iter = originating_peer->items_requested_from_peer.find(
3476  if (item_iter != originating_peer->items_requested_from_peer.end())
3477  {
3478  originating_peer->items_requested_from_peer.erase(item_iter);
3479  process_block_when_in_sync(originating_peer, block_message_to_process, message_hash);
3480  if (originating_peer->idle())
3482  return;
3483  }
3484  else
3485  {
3486  // not during normal operation. see if we requested it during sync
3487  auto sync_item_iter = originating_peer->sync_items_requested_from_peer.find( block_message_to_process.block_id);
3488  if (sync_item_iter != originating_peer->sync_items_requested_from_peer.end())
3489  {
3490  originating_peer->sync_items_requested_from_peer.erase(sync_item_iter);
3491  // if exceptions are throw here after removing the sync item from the list (above),
3492  // it could leave our sync in a stalled state. Wrap a try/catch around the rest
3493  // of the function so we can log if this ever happens.
3494  try
3495  {
3496  originating_peer->last_sync_item_received_time = fc::time_point::now();
3497  _active_sync_requests.erase(block_message_to_process.block_id);
3498  process_block_during_syncing(originating_peer, block_message_to_process, message_hash);
3499  if (originating_peer->idle())
3500  {
3501  // we have finished fetching a batch of items, so we either need to grab another batch of items
3502  // or we need to get another list of item ids.
3503  if (originating_peer->number_of_unfetched_item_ids > 0 &&
3505  fetch_next_batch_of_item_ids_from_peer(originating_peer);
3506  else
3508  }
3509  return;
3510  }
3511  catch (const fc::canceled_exception& e)
3512  {
3513  throw;
3514  }
3515  catch (const fc::exception& e)
3516  {
3517  elog("Caught unexpected exception: ${e}", ("e", e));
3518  assert(false && "exceptions not expected here");
3519  }
3520  catch (const std::exception& e)
3521  {
3522  elog("Caught unexpected exception: ${e}", ("e", e.what()));
3523  assert(false && "exceptions not expected here");
3524  }
3525  catch (...)
3526  {
3527  elog("Caught unexpected exception, could break sync operation");
3528  }
3529  }
3530  }
3531 
3532  // if we get here, we didn't request the message, we must have a misbehaving peer
3533  wlog("received a block ${block_id} I didn't ask for from peer ${endpoint}, disconnecting from peer",
3534  ("endpoint", originating_peer->get_remote_endpoint())
3535  ("block_id", block_message_to_process.block_id));
3536  fc::exception detailed_error(FC_LOG_MESSAGE(error, "You sent me a block that I didn't ask for, block_id: ${block_id}",
3537  ("block_id", block_message_to_process.block_id)
3538  ("graphene_git_revision_sha", originating_peer->graphene_git_revision_sha)
3539  ("graphene_git_revision_unix_timestamp", originating_peer->graphene_git_revision_unix_timestamp)
3540  ("fc_git_revision_sha", originating_peer->fc_git_revision_sha)
3541  ("fc_git_revision_unix_timestamp", originating_peer->fc_git_revision_unix_timestamp)));
3542  disconnect_from_peer(originating_peer, "You sent me a block that I didn't ask for", true, detailed_error);
3543  }
3544 
3546  const current_time_request_message& current_time_request_message_received)
3547  {
3549  fc::time_point request_received_time(fc::time_point::now());
3550  current_time_reply_message reply(current_time_request_message_received.request_sent_time,
3551  request_received_time);
3552  originating_peer->send_message(reply, offsetof(current_time_reply_message, reply_transmitted_time));
3553  }
3554 
3556  const current_time_reply_message& current_time_reply_message_received)
3557  {
3559  fc::time_point reply_received_time = fc::time_point::now();
3560  constexpr uint8_t two = 2;
3561  originating_peer->clock_offset = fc::microseconds( ( (current_time_reply_message_received.request_received_time
3562  - current_time_reply_message_received.request_sent_time)
3563  + (current_time_reply_message_received.reply_transmitted_time
3564  - reply_received_time) ).count() / two );
3565  originating_peer->round_trip_delay = ( reply_received_time
3566  - current_time_reply_message_received.request_sent_time )
3567  - ( current_time_reply_message_received.reply_transmitted_time
3568  - current_time_reply_message_received.request_received_time );
3569  }
3570 
3571  // this handles any message we get that doesn't require any special processing.
3572  // currently, this is any message other than block messages and p2p-specific
3573  // messages. (transaction messages would be handled here, for example)
3574  // this just passes the message to the client, and does the bookkeeping
3575  // related to requesting and rebroadcasting the message.
3577  const message& message_to_process,
3578  const message_hash_type& message_hash )
3579  {
3581  fc::time_point message_receive_time = fc::time_point::now();
3582 
3583  // only process it if we asked for it
3584  auto iter = originating_peer->items_requested_from_peer.find(
3585  item_id(message_to_process.msg_type.value(), message_hash) );
3586  if( iter == originating_peer->items_requested_from_peer.end() )
3587  {
3588  wlog( "received a message I didn't ask for from peer ${endpoint}, disconnecting from peer",
3589  ( "endpoint", originating_peer->get_remote_endpoint() ) );
3590  fc::exception detailed_error( FC_LOG_MESSAGE(error,
3591  "You sent me a message that I didn't ask for, message_hash: ${message_hash}",
3592  ( "message_hash", message_hash ) ) );
3593  disconnect_from_peer( originating_peer, "You sent me a message that I didn't request", true, detailed_error );
3594  return;
3595  }
3596  else
3597  {
3598  originating_peer->items_requested_from_peer.erase( iter );
3599  if (originating_peer->idle())
3601 
3602  // Next: have the delegate process the message
3603  fc::time_point message_validated_time;
3604  try
3605  {
3606  if (message_to_process.msg_type.value() == trx_message_type)
3607  {
3608  trx_message transaction_message_to_process = message_to_process.as<trx_message>();
3609  dlog( "passing message containing transaction ${trx} to client",
3610  ("trx", transaction_message_to_process.trx.id()) );
3611  _delegate->handle_transaction(transaction_message_to_process);
3612  }
3613  else
3614  _delegate->handle_message( message_to_process );
3615  message_validated_time = fc::time_point::now();
3616  }
3617  catch ( const fc::canceled_exception& )
3618  {
3619  throw;
3620  }
3621  catch ( const fc::exception& e )
3622  {
3623  switch( e.code() )
3624  {
3625  // log common exceptions in debug level
3626  case graphene::chain::duplicate_transaction::code_enum::code_value :
3627  case graphene::chain::limit_order_create_kill_unfilled::code_enum::code_value :
3628  case graphene::chain::limit_order_create_market_not_whitelisted::code_enum::code_value :
3629  case graphene::chain::limit_order_create_market_blacklisted::code_enum::code_value :
3630  case graphene::chain::limit_order_create_selling_asset_unauthorized::code_enum::code_value :
3631  case graphene::chain::limit_order_create_receiving_asset_unauthorized::code_enum::code_value :
3632  case graphene::chain::limit_order_create_insufficient_balance::code_enum::code_value :
3633  case graphene::chain::limit_order_update_nonexist_order::code_enum::code_value :
3634  case graphene::chain::limit_order_update_owner_mismatch::code_enum::code_value :
3635  case graphene::chain::limit_order_cancel_nonexist_order::code_enum::code_value :
3636  case graphene::chain::limit_order_cancel_owner_mismatch::code_enum::code_value :
3637  case graphene::chain::liquidity_pool_exchange_unfillable_price::code_enum::code_value :
3638  dlog( "client rejected message sent by peer ${peer}, ${e}",
3639  ("peer", originating_peer->get_remote_endpoint() )("e", e) );
3640  break;
3641  // log rarer exceptions in warn level
3642  default:
3643  wlog( "client rejected message sent by peer ${peer}, ${e}",
3644  ("peer", originating_peer->get_remote_endpoint() )("e", e) );
3645  break;
3646  }
3647  // record it so we don't try to fetch this item again
3649  item_id( message_to_process.msg_type.value(), message_hash ), fc::time_point::now() ) );
3650  return;
3651  }
3652 
3653  // finally, if the delegate validated the message, broadcast it to our other peers
3654  message_propagation_data propagation_data { message_receive_time, message_validated_time,
3655  originating_peer->node_id };
3656  broadcast( message_to_process, propagation_data );
3657  }
3658  }
3659 
3661  {
3663  peer->ids_of_items_to_get.clear();
3664  peer->number_of_unfetched_item_ids = 0;
3665  peer->we_need_sync_items_from_peer = true;
3666  peer->last_block_delegate_has_seen = item_hash_t();
3667  peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hash_t());
3668  peer->inhibit_fetching_sync_blocks = false;
3670  }
3671 
3673  {
3675  for( const peer_connection_ptr& peer : _active_connections )
3677  }
3678 
3680  {
3682  peer->send_message(current_time_request_message(),
3683  offsetof(current_time_request_message, request_sent_time));
3686  {
3688  _delegate->connection_count_changed( _last_reported_number_of_conns );
3689  }
3690  // If it is an inbound connection, try to verify its inbound endpoint
3691  if( peer_connection_direction::inbound == peer->direction )
3692  {
3693  for( const auto& potential_inbound_endpoint : peer->potential_inbound_endpoints )
3694  _add_once_node_list.emplace_back( potential_inbound_endpoint.first );
3695  }
3696  }
3697 
3699  {
3701 
3702  try
3703  {
3705  }
3706  catch ( const fc::exception& e )
3707  {
3708  wlog( "Exception thrown while closing P2P peer database, ignoring: ${e}", ("e", e) );
3709  }
3710  catch (...)
3711  {
3712  wlog( "Exception thrown while closing P2P peer database, ignoring" );
3713  }
3714 
3715  // First, stop accepting incoming network connections
3716  try
3717  {
3718  _tcp_server.close();
3719  dlog("P2P TCP server closed");
3720  }
3721  catch ( const fc::exception& e )
3722  {
3723  wlog( "Exception thrown while closing P2P TCP server, ignoring: ${e}", ("e", e) );
3724  }
3725  catch (...)
3726  {
3727  wlog( "Exception thrown while closing P2P TCP server, ignoring" );
3728  }
3729 
3730  try
3731  {
3732  _accept_loop_complete.cancel_and_wait("node_impl::close()");
3733  dlog("P2P accept loop terminated");
3734  }
3735  catch ( const fc::exception& e )
3736  {
3737  wlog( "Exception thrown while terminating P2P accept loop, ignoring: ${e}", ("e", e) );
3738  }
3739  catch (...)
3740  {
3741  wlog( "Exception thrown while terminating P2P accept loop, ignoring" );
3742  }
3743 
3744  // terminate all of our long-running loops (these run continuously instead of rescheduling themselves)
3745  try
3746  {
3747  _p2p_network_connect_loop_done.cancel("node_impl::close()");
3748  // cancel() is currently broken, so we need to wake up the task to allow it to finish
3751  dlog("P2P connect loop terminated");
3752  }
3753  catch ( const fc::canceled_exception& )
3754  {
3755  dlog("P2P connect loop terminated");
3756  }
3757  catch ( const fc::exception& e )
3758  {
3759  wlog( "Exception thrown while terminating P2P connect loop, ignoring: ${e}", ("e", e) );
3760  }
3761  catch (...)
3762  {
3763  wlog( "Exception thrown while terminating P2P connect loop, ignoring" );
3764  }
3765 
3766  try
3767  {
3769  dlog("Process backlog of sync items task terminated");
3770  }
3771  catch ( const fc::canceled_exception& )
3772  {
3773  dlog("Process backlog of sync items task terminated");
3774  }
3775  catch ( const fc::exception& e )
3776  {
3777  wlog( "Exception thrown while terminating Process backlog of sync items task, ignoring: ${e}", ("e", e) );
3778  }
3779  catch (...)
3780  {
3781  wlog( "Exception thrown while terminating Process backlog of sync items task, ignoring" );
3782  }
3783 
3784  size_t handle_message_call_count = 0;
3785  while( true )
3786  {
3787  auto it = _handle_message_calls_in_progress.begin();
3788  if( it == _handle_message_calls_in_progress.end() )
3789  break;
3790  if( it->ready() || it->error() || it->canceled() )
3791  {
3793  continue;
3794  }
3795  ++handle_message_call_count;
3796  try
3797  {
3798  it->cancel_and_wait("node_impl::close()");
3799  dlog("handle_message call #${count} task terminated", ("count", handle_message_call_count));
3800  }
3801  catch ( const fc::canceled_exception& )
3802  {
3803  dlog("handle_message call #${count} task terminated", ("count", handle_message_call_count));
3804  }
3805  catch ( const fc::exception& e )
3806  {
3807  wlog("Exception thrown while terminating handle_message call #${count} task, ignoring: ${e}", ("e", e)("count", handle_message_call_count));
3808  }
3809  catch (...)
3810  {
3811  wlog("Exception thrown while terminating handle_message call #${count} task, ignoring",("count", handle_message_call_count));
3812  }
3813  }
3814 
3815  try
3816  {
3817  _fetch_sync_items_loop_done.cancel("node_impl::close()");
3818  // cancel() is currently broken, so we need to wake up the task to allow it to finish
3821  dlog("Fetch sync items loop terminated");
3822  }
3823  catch ( const fc::canceled_exception& )
3824  {
3825  dlog("Fetch sync items loop terminated");
3826  }
3827  catch ( const fc::exception& e )
3828  {
3829  wlog( "Exception thrown while terminating Fetch sync items loop, ignoring: ${e}", ("e", e) );
3830  }
3831  catch (...)
3832  {
3833  wlog( "Exception thrown while terminating Fetch sync items loop, ignoring" );
3834  }
3835 
3836  try
3837  {
3838  _fetch_item_loop_done.cancel("node_impl::close()");
3839  // cancel() is currently broken, so we need to wake up the task to allow it to finish
3842  dlog("Fetch items loop terminated");
3843  }
3844  catch ( const fc::canceled_exception& )
3845  {
3846  dlog("Fetch items loop terminated");
3847  }
3848  catch ( const fc::exception& e )
3849  {
3850  wlog( "Exception thrown while terminating Fetch items loop, ignoring: ${e}", ("e", e) );
3851  }
3852  catch (...)
3853  {
3854  wlog( "Exception thrown while terminating Fetch items loop, ignoring" );
3855  }
3856 
3857  try
3858  {
3859  _advertise_inventory_loop_done.cancel("node_impl::close()");
3860  // cancel() is currently broken, so we need to wake up the task to allow it to finish
3863  dlog("Advertise inventory loop terminated");
3864  }
3865  catch ( const fc::canceled_exception& )
3866  {
3867  dlog("Advertise inventory loop terminated");
3868  }
3869  catch ( const fc::exception& e )
3870  {
3871  wlog( "Exception thrown while terminating Advertise inventory loop, ignoring: ${e}", ("e", e) );
3872  }
3873  catch (...)
3874  {
3875  wlog( "Exception thrown while terminating Advertise inventory loop, ignoring" );
3876  }
3877 
3878 
3879  // Next, terminate our existing connections. First, close all of the connections nicely.
3880  // This will close the sockets and may result in calls to our "on_connection_closing"
3881  // method to inform us that the connection really closed (or may not if we manage to cancel
3882  // the read loop before it gets an EOF).
3883  // operate off copies of the lists in case they change during iteration
3884  std::list<peer_connection_ptr> all_peers;
3885  auto p_back = [&all_peers](const peer_connection_ptr& conn) { all_peers.push_back(conn); };
3886  {
3889  }
3890  {
3893  }
3894  {
3897  }
3898 
3899  for (const peer_connection_ptr& peer : all_peers)
3900  {
3901  try
3902  {
3903  peer->destroy_connection();
3904  }
3905  catch ( const fc::exception& e )
3906  {
3907  wlog( "Exception thrown while closing peer connection, ignoring: ${e}", ("e", e) );
3908  }
3909  catch (...)
3910  {
3911  wlog( "Exception thrown while closing peer connection, ignoring" );
3912  }
3913  }
3914 
3915  // and delete all of the peer_connection objects
3919  all_peers.clear();
3920 
3921  {
3922 #ifdef USE_PEERS_TO_DELETE_MUTEX
3923  fc::scoped_lock<fc::mutex> lock(_peers_to_delete_mutex);
3924 #endif
3925  try
3926  {
3927  _delayed_peer_deletion_task_done.cancel_and_wait("node_impl::close()");
3928  dlog("Delayed peer deletion task terminated");
3929  }
3930  catch ( const fc::exception& e )
3931  {
3932  wlog( "Exception thrown while terminating Delayed peer deletion task, ignoring: ${e}", ("e", e) );
3933  }
3934  catch (...)
3935  {
3936  wlog( "Exception thrown while terminating Delayed peer deletion task, ignoring" );
3937  }
3938  _peers_to_delete.clear();
3939  }
3940 
3941  // Now that there are no more peers that can call methods on us, there should be no
3942  // chance for one of our loops to be rescheduled, so we can safely terminate all of
3943  // our loops now
3944  try
3945  {
3946  _kill_inactive_conns_loop_done.cancel_and_wait("node_impl::close()");
3947  dlog("Kill inactive connections loop terminated");
3948  }
3949  catch ( const fc::exception& e )
3950  {
3951  wlog( "Exception thrown while terminating Terminate inactive connections loop, ignoring: ${e}", ("e", e) );
3952  }
3953  catch (...)
3954  {
3955  wlog( "Exception thrown while terminating Terminate inactive connections loop, ignoring" );
3956  }
3957 
3958  try
3959  {
3961  dlog("Fetch updated peer lists loop terminated");
3962  }
3963  catch ( const fc::exception& e )
3964  {
3965  wlog( "Exception thrown while terminating Fetch updated peer lists loop, ignoring: ${e}", ("e", e) );
3966  }
3967  catch (...)
3968  {
3969  wlog( "Exception thrown while terminating Fetch updated peer lists loop, ignoring" );
3970  }
3971 
3972  try
3973  {
3974  _update_seed_nodes_loop_done.cancel_and_wait("node_impl::close()");
3975  dlog("Update seed nodes loop terminated");
3976  }
3977  catch ( const fc::exception& e )
3978  {
3979  wlog( "Exception thrown while terminating Update seed nodes loop, ignoring: ${e}", ("e", e) );
3980  }
3981  catch (...)
3982  {
3983  wlog( "Exception thrown while terminating Update seed nodes loop, ignoring" );
3984  }
3985 
3986  try
3987  {
3988  _bandwidth_monitor_loop_done.cancel_and_wait("node_impl::close()");
3989  dlog("Bandwidth monitor loop terminated");
3990  }
3991  catch ( const fc::exception& e )
3992  {
3993  wlog( "Exception thrown while terminating Bandwidth monitor loop, ignoring: ${e}", ("e", e) );
3994  }
3995  catch (...)
3996  {
3997  wlog( "Exception thrown while terminating Bandwidth monitor loop, ignoring" );
3998  }
3999 
4000  try
4001  {
4002  _dump_node_status_task_done.cancel_and_wait("node_impl::close()");
4003  dlog("Dump node status task terminated");
4004  }
4005  catch ( const fc::exception& e )
4006  {
4007  wlog( "Exception thrown while terminating Dump node status task, ignoring: ${e}", ("e", e) );
4008  }
4009  catch (...)
4010  {
4011  wlog( "Exception thrown while terminating Dump node status task, ignoring" );
4012  }
4013  } // node_impl::close()
4014 
4016  {
4018  new_peer->accept_connection(); // this blocks until the secure connection is fully negotiated
4019  send_hello_message(new_peer);
4020  }
4021 
4023  {
4025  while ( !_accept_loop_complete.canceled() )
4026  {
4028 
4029  try
4030  {
4031  _tcp_server.accept( new_peer->get_socket() );
4032  ilog( "accepted inbound connection from ${remote_endpoint}",
4033  ("remote_endpoint", new_peer->get_socket().remote_endpoint() ) );
4035  return;
4036  new_peer->connection_initiation_time = fc::time_point::now();
4037  _handshaking_connections.insert( new_peer );
4038  _rate_limiter.add_tcp_socket( &new_peer->get_socket() );
4039  std::weak_ptr<peer_connection> new_weak_peer(new_peer);
4040  new_peer->accept_or_connect_task_done = fc::async( [this, new_weak_peer]() {
4041  peer_connection_ptr new_peer(new_weak_peer.lock());
4042  assert(new_peer);
4043  if (!new_peer)
4044  return;
4045  accept_connection_task(new_peer);
4046  }, "accept_connection_task" );
4047 
4048  // limit the rate at which we accept connections to mitigate DOS attacks
4050  } FC_CAPTURE_AND_LOG( (0) ) // GCOVR_EXCL_LINE
4051  }
4052  } // accept_loop()
4053 
4055  {
4058 
4059  fc::sha256::encoder shared_secret_encoder;
4060  fc::sha512 shared_secret = peer->get_shared_secret();
4061  shared_secret_encoder.write(shared_secret.data(), sizeof(shared_secret));
4063  = _node_configuration.private_key.sign_compact(shared_secret_encoder.result());
4064 
4065  // In the hello messsage, we send three things:
4066  // * inbound IP address
4067  // * inbound port
4068  // * outbound port
4069  //
4070  // If we don't accept incoming connections, we send nothing.
4071  //
4072  // The peer we're connecting to may assume we're firewalled if the
4073  // IP address and outbound port we send don't match the values it sees on its remote endpoint,
4074  // but it is not always true, E.G. if the peer itself is behind a reverse proxy.
4075  //
4076  // Note: we no longer perform remote firewall check (ask the peer to check whether we are firewalled),
4077  // thus we don't know our external IP address,
4078  // nor we know whether we're behind NAT or a reverse proxy that will allow incoming connections.
4079  // However, if the "p2p-inbound-endpoint" node startup option is configured, we send that instead.
4080 
4081  fc::ip::address inbound_address; // default 0.0.0.0
4082  uint16_t inbound_port = 0;
4083  uint16_t outbound_port = 0;
4085  {
4086  fc::ip::endpoint local_endpoint = peer->get_socket().local_endpoint();
4087  inbound_address = local_endpoint.get_address();
4088  inbound_port = _actual_listening_endpoint.port();
4089  outbound_port = local_endpoint.port();
4091  {
4093  inbound_address = _node_configuration.inbound_endpoint->get_address();
4094  inbound_port = _node_configuration.inbound_endpoint->port();
4095  }
4096  }
4097 
4100  inbound_address,
4101  inbound_port,
4102  outbound_port,
4104  signature,
4105  _chain_id,
4107 
4108  peer->send_message(message(hello));
4109  }
4110 
4112  const fc::ip::endpoint& remote_endpoint)
4113  {
4115 
4116  // create or find the database entry for the new peer
4117  auto updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_ep(remote_endpoint);
4118  updated_peer_record.last_connection_disposition = last_connection_failed;
4119  updated_peer_record.last_connection_attempt_time = fc::time_point::now();;
4120  _potential_peer_db.update_entry(updated_peer_record);
4121 
4122  fc::oexception connect_failed_exception;
4123 
4124  try
4125  {
4126  ilog("Connecting to peer ${peer}", ("peer", remote_endpoint));
4127  // blocks until the connection is established and secure connection is negotiated
4130  new_peer->connect_to( remote_endpoint, bind_to_endpoint );
4131 
4132  // we connected to the peer. guess they're not firewalled....
4133  new_peer->is_firewalled = firewalled_state::not_firewalled;
4134 
4135  // connection succeeded, we've started handshaking. record that in our database
4136  updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_ep(remote_endpoint);
4137  updated_peer_record.last_connection_disposition = last_connection_handshaking_failed;
4138  updated_peer_record.number_of_successful_connection_attempts++;
4139  updated_peer_record.last_seen_time = fc::time_point::now();
4140  _potential_peer_db.update_entry(updated_peer_record);
4141  }
4142  catch (const fc::exception& except)
4143  {
4144  connect_failed_exception = except;
4145  }
4146 
4147  if( connect_failed_exception )
4148  {
4149  // connection failed. record that in our database
4150  updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_ep(remote_endpoint);
4151  updated_peer_record.last_connection_disposition = last_connection_failed;
4152  updated_peer_record.number_of_failed_connection_attempts++;
4153  if (new_peer->connection_closed_error)
4154  updated_peer_record.last_error = *new_peer->connection_closed_error;
4155  else
4156  updated_peer_record.last_error = *connect_failed_exception;
4157  _potential_peer_db.update_entry(updated_peer_record);
4158 
4159  // If this is for inbound endpoint verification,
4160  // here we could try to find the original connection and update its firewalled state,
4161  // but it doesn't seem necessary.
4162 
4163  // if the connection failed, we want to disconnect now.
4164  _handshaking_connections.erase(new_peer);
4165  _terminating_connections.erase(new_peer);
4166  _active_connections.erase(new_peer);
4167  _closing_connections.erase(new_peer);
4168 
4171  schedule_peer_for_deletion(new_peer);
4172 
4173  throw *connect_failed_exception;
4174  }
4175  else
4176  {
4177  // connection was successful and we want to stay connected
4178  fc::ip::endpoint local_endpoint = new_peer->get_local_endpoint();
4179  new_peer->inbound_address = local_endpoint.get_address();
4181  : 0;
4182  new_peer->outbound_port = local_endpoint.port();
4183 
4186  send_hello_message(new_peer);
4187  ilog("Sent \"hello\" to peer ${peer}", ("peer", new_peer->get_remote_endpoint()));
4188  }
4189  }
4190 
4191  // methods implementing node's public interface
4192  void node_impl::set_node_delegate(std::shared_ptr<node_delegate> del, fc::thread* thread_for_delegate_calls)
4193  {
4195  _delegate.reset();
4196  if (del)
4197  _delegate = std::make_unique<statistics_gathering_node_delegate_wrapper>(del, thread_for_delegate_calls);
4198  if( _delegate )
4199  _chain_id = del->get_chain_id();
4200  }
4201 
4202  void node_impl::load_configuration( const fc::path& configuration_directory )
4203  {
4205  _node_configuration_directory = configuration_directory;
4207  bool node_configuration_loaded = false;
4208  if( fc::exists(configuration_file_name ) )
4209  {
4210  try
4211  {
4213  ilog( "Loaded configuration from file ${filename}", ("filename", configuration_file_name ) );
4214 
4217 
4218  node_configuration_loaded = true;
4219  }
4220  catch ( fc::parse_error_exception& parse_error )
4221  {
4222  elog( "malformed node configuration file ${filename}: ${error}",
4223  ( "filename", configuration_file_name )("error", parse_error.to_detail_string() ) );
4224  }
4225  catch ( fc::exception& except )
4226  {
4227  elog( "unexpected exception while reading configuration file ${filename}: ${error}",
4228  ( "filename", configuration_file_name )("error", except.to_detail_string() ) );
4229  }
4230  }
4231 
4232  if( !node_configuration_loaded )
4233  {
4235 
4236 #ifdef GRAPHENE_TEST_NETWORK
4237  uint32_t port = GRAPHENE_NET_TEST_P2P_PORT;
4238 #else
4239  uint32_t port = GRAPHENE_NET_DEFAULT_P2P_PORT;
4240 #endif
4242 
4243  ilog( "generating new private key for this node" );
4246  }
4247 
4249 
4250  fc::path potential_peer_database_file_name(_node_configuration_directory / POTENTIAL_PEER_DATABASE_FILENAME);
4251  try
4252  {
4253  _potential_peer_db.open(potential_peer_database_file_name);
4254 
4255  // push back the time on all peers loaded from the database so we will be able to retry them immediately
4256  // Note: this step is almost useless because we didn't multiply _peer_connection_retry_timeout
4257  // by number_of_failed_connection_attempts. However, it is probably desired as we don't want
4258  // to try to connect to a large number of dead nodes at startup.
4259  // As of writing, _peer_connection_retry_timeout is 30 seconds, pushing the time back that much
4260  // won't have much impact in production.
4261  // TODO Perhaps just remove it.
4263  {
4264  potential_peer_record updated_peer_record = *itr;
4265  updated_peer_record.last_connection_attempt_time = std::min<fc::time_point_sec>(updated_peer_record.last_connection_attempt_time,
4267  _potential_peer_db.update_entry(updated_peer_record);
4268  }
4269 
4271  }
4272  catch (fc::exception& except)
4273  {
4274  elog("unable to open peer database ${filename}: ${error}",
4275  ("filename", potential_peer_database_file_name)("error", except.to_detail_string()));
4276  throw;
4277  }
4278  }
4279 
4281  {
4284  {
4285  wlog("accept_incoming_connections is false, p2p network will not accept any incoming connections");
4286  return;
4287  }
4288 
4290 
4292  if( listen_endpoint.port() != 0 )
4293  {
4294  // if the user specified a port, we only want to bind to it if it's not already
4295  // being used by another application. During normal operation, we set the
4296  // SO_REUSEADDR/SO_REUSEPORT flags so that we can bind outbound sockets to the
4297  // same local endpoint as we're listening on here. On some platforms, setting
4298  // those flags will prevent us from detecting that other applications are
4299  // listening on that port. We'd like to detect that, so we'll set up a temporary
4300  // tcp server without that flag to see if we can listen on that port.
4301  // Note: There is a race condition where another application may start listening
4302  // on the same port just after the temporary tcp server is destroyed and
4303  // before we try to listen with the real tcp server.
4304  // This happens frequently when running multiple test cases at the same
4305  // time, but less likely in production.
4306  bool first = true;
4307  for( ;; )
4308  {
4309  bool listen_failed = false;
4310 
4311  try
4312  {
4313  fc::tcp_server temporary_server;
4314  if( listen_endpoint.get_address() != fc::ip::address() )
4315  temporary_server.listen( listen_endpoint );
4316  else
4317  temporary_server.listen( listen_endpoint.port() );
4318  break;
4319  }
4320  catch ( const fc::exception&)
4321  {
4322  listen_failed = true;
4323  }
4324 
4325  if (listen_failed)
4326  {
4328  {
4329  std::ostringstream error_message_stream;
4330  if( first )
4331  {
4332  error_message_stream << "Unable to listen for connections on port " << listen_endpoint.port()
4333  << ", retrying in a few seconds\n";
4334  error_message_stream << "You can wait for it to become available, or restart this program using\n";
4335  error_message_stream << "the --p2p-endpoint option to specify another port\n";
4336  first = false;
4337  }
4338  else
4339  {
4340  error_message_stream << "\nStill waiting for port " << listen_endpoint.port() << " to become available\n";
4341  }
4342  std::string error_message = error_message_stream.str();
4343  wlog(error_message);
4344  std::cout << "\033[31m" << error_message;
4345  _delegate->error_encountered( error_message, fc::oexception() );
4346  fc::usleep( fc::seconds(5 ) );
4347  }
4348  else // don't wait, just find a random port
4349  {
4350  wlog( "unable to bind on the requested endpoint ${endpoint}, which probably means that endpoint is already in use",
4351  ( "endpoint", listen_endpoint ) );
4352  listen_endpoint.set_port( 0 );
4353  }
4354  } // if (listen_failed)
4355  } // for(;;)
4356  } // if (listen_endpoint.port() != 0)
4357  else // port is 0
4358  {
4359  // if they requested a random port, we'll just assume it's available
4360  // (it may not be due to ip address, but we'll detect that in the next step)
4361  }
4362 
4364  try
4365  {
4366  if( listen_endpoint.get_address() != fc::ip::address() )
4367  _tcp_server.listen( listen_endpoint );
4368  else
4369  _tcp_server.listen( listen_endpoint.port() );
4371  ilog( "listening for connections on endpoint ${endpoint} (our first choice)",
4372  ( "endpoint", _actual_listening_endpoint ) );
4373  }
4374  catch ( fc::exception& e )
4375  {
4376  FC_RETHROW_EXCEPTION( e, error, "unable to listen on ${endpoint}", ("endpoint",listen_endpoint ) );
4377  }
4378  }
4379 
4381  {
4384 
4385  assert(!_accept_loop_complete.valid() &&
4396  _accept_loop_complete = fc::async( [this](){ accept_loop(); }, "accept_loop");
4397 
4399  "p2p_network_connect_loop" );
4400  _fetch_sync_items_loop_done = fc::async( [this]() { fetch_sync_items_loop(); }, "fetch_sync_items_loop" );
4401  _fetch_item_loop_done = fc::async( [this]() { fetch_items_loop(); }, "fetch_items_loop" );
4403  "advertise_inventory_loop" );
4405  "kill_inactive_conns_loop" );
4407  "fetch_updated_peer_lists_loop");
4408  _bandwidth_monitor_loop_done = fc::async([this](){ bandwidth_monitor_loop(); }, "bandwidth_monitor_loop");
4409  _dump_node_status_task_done = fc::async([this](){ dump_node_status_task(); }, "dump_node_status_task");
4411  }
4412 
4414  {
4416  // if we're connecting to them, we believe they're not firewalled
4417  auto updated_peer_record = _potential_peer_db.lookup_or_create_entry_for_ep(ep);
4418 
4419  // if we've recently connected to this peer, reset the last_connection_attempt_time to allow
4420  // us to immediately retry this peer
4421  // Note: to make it work, we need to multiply _peer_connection_retry_timeout
4422  // by number_of_failed_connection_attempts.
4423  // However, this step is almost useless because we will immediately try to connect anyway
4424  // due to _add_once_node_list.
4425  // On the other hand, if we connected to the peer already but it was not in the peer database somehow,
4426  // this step makes sure that it will be added to the peer database.
4427  auto delay_until_retry = fc::seconds( (updated_peer_record.number_of_failed_connection_attempts + 1)
4429  updated_peer_record.last_connection_attempt_time
4430  = std::min<fc::time_point_sec>( updated_peer_record.last_connection_attempt_time,
4431  fc::time_point::now() - delay_until_retry );
4432  _add_once_node_list.push_back(updated_peer_record);
4433  _potential_peer_db.update_entry(updated_peer_record);
4435  }
4436 
4437  void node_impl::add_seed_node(const std::string& endpoint_string)
4438  {
4440  _seed_nodes.insert( endpoint_string );
4441  resolve_seed_node_and_add( endpoint_string );
4442  }
4443 
4453  static std::vector<fc::ip::endpoint> resolve_string_to_ip_endpoints(const std::string& in)
4454  {
4455  try
4456  {
4457  std::string::size_type colon_pos = in.find(':');
4458  if (colon_pos == std::string::npos)
4459  FC_THROW("Missing required port number in endpoint string \"${endpoint_string}\"",
4460  ("endpoint_string", in));
4461  std::string port_string = in.substr(colon_pos + 1);
4462  try
4463  {
4464  uint16_t port = boost::lexical_cast<uint16_t>(port_string);
4465 
4466  std::string hostname = in.substr(0, colon_pos);
4467  std::vector<fc::ip::endpoint> endpoints = fc::resolve(hostname, port);
4468  if (endpoints.empty())
4469  FC_THROW_EXCEPTION( fc::unknown_host_exception,
4470  "The host name can not be resolved: ${hostname}",
4471  ("hostname", hostname) );
4472  return endpoints;
4473  }
4474  catch (const boost::bad_lexical_cast&)
4475  {
4476  FC_THROW("Bad port: ${port}", ("port", port_string));
4477  }
4478  }
4479  FC_CAPTURE_AND_RETHROW((in)) // GCOVR_EXCL_LINE
4480  }
4481 
4482  void node_impl::resolve_seed_node_and_add(const std::string& endpoint_string)
4483  {
4485  std::vector<fc::ip::endpoint> endpoints;
4486  ilog("Resolving seed node ${endpoint}", ("endpoint", endpoint_string));
4487  try
4488  {
4489  endpoints = resolve_string_to_ip_endpoints(endpoint_string);
4490  }
4491  catch(...)
4492  {
4493  wlog( "Unable to resolve endpoint during attempt to add seed node ${ep}", ("ep", endpoint_string) );
4494  }
4495  for (const fc::ip::endpoint& endpoint : endpoints)
4496  {
4497  ilog("Adding seed node ${endpoint}", ("endpoint", endpoint));
4498  add_node(endpoint);
4499  }
4500  }
4501 
4503  {
4504  new_peer->get_socket().open();
4505  new_peer->get_socket().set_reuse_address();
4506  new_peer->connection_initiation_time = fc::time_point::now();
4507  _handshaking_connections.insert(new_peer);
4508  _rate_limiter.add_tcp_socket(&new_peer->get_socket());
4509 
4511  return;
4512 
4513  std::weak_ptr<peer_connection> new_weak_peer(new_peer);
4514  new_peer->accept_or_connect_task_done = fc::async([this, new_weak_peer](){
4515  peer_connection_ptr new_peer(new_weak_peer.lock());
4516  assert(new_peer);
4517  if (!new_peer)
4518  return;
4519  connect_to_task(new_peer, *new_peer->get_remote_endpoint());
4520  }, "connect_to_task");
4521  }
4522 
4524  {
4526  if( is_connected_to_endpoint(remote_endpoint) )
4527  FC_THROW_EXCEPTION(already_connected_to_requested_peer, "already connected to requested endpoint ${endpoint}",
4528  ("endpoint", remote_endpoint));
4529 
4530  dlog("node_impl::connect_to_endpoint(${endpoint})", ("endpoint", remote_endpoint));
4532  new_peer->set_remote_endpoint(remote_endpoint);
4533  initiate_connect_to(new_peer);
4534  }
4535 
4537  {
4540  for( const peer_connection_ptr& active_peer : _active_connections )
4541  {
4542  // Note: for outbound connections, checking by remote_endpoint is OK,
4543  // and we will ignore the inbound address and port it sends to us when handshaking.
4544  // For an inbound active connection, we want to verify its inbound endpoint, if it happens to be
4545  // the same as remote_endpoint but not yet verified, we consider it as not connected.
4546  // * If verification succeeds, we will mark it as "verified" and won't try to connect again.
4547  // * We may fail to verify if it is firewalled, in this case number_of_failed_connection_attempts
4548  // will increase, so we will not reconnect soon, but will wait longer and longer.
4549  fc::optional<fc::ip::endpoint> endpoint_for_this_peer = active_peer->get_remote_endpoint();
4550  if( peer_connection_direction::outbound == active_peer->direction
4551  && endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint )
4552  return active_peer;
4553  // Note: if it is an inbound connection and its inbound endpoint is verified already,
4554  // the inbound endpoint should be in additional_inbound_endpoints
4555  if( active_peer->additional_inbound_endpoints.find( remote_endpoint )
4556  != active_peer->additional_inbound_endpoints.end() )
4557  return active_peer;
4558  }
4559  return peer_connection_ptr();
4560  }
4561 
4563  {
4565  peer_connection_ptr active_ptr = get_active_conn_for_endpoint( remote_endpoint );
4566  if ( active_ptr != peer_connection_ptr() )
4567  return active_ptr;
4569  for( const peer_connection_ptr& handshaking_peer : _handshaking_connections )
4570  {
4571  // For an inbound handshaking connection, there is a race condition since we might not know its node_id yet,
4572  // so be stricter here.
4573  // Even so, there may be situations that we end up having multiple active connections with them.
4574  fc::optional<fc::ip::endpoint> endpoint_for_this_peer = handshaking_peer->get_remote_endpoint();
4575  if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint )
4576  return handshaking_peer;
4577  // Note: if it is an inbound connection and its inbound endpoint is verified already,
4578  // the inbound endpoint should be in additional_inbound_endpoints
4579  if( handshaking_peer->additional_inbound_endpoints.find( remote_endpoint )
4580  != handshaking_peer->additional_inbound_endpoints.end() )
4581  return handshaking_peer;
4582  }
4583  return peer_connection_ptr();
4584  }
4585 
4586  bool node_impl::is_connected_to_endpoint( const fc::ip::endpoint& remote_endpoint ) const
4587  {
4589  return get_connection_for_endpoint( remote_endpoint ) != peer_connection_ptr();
4590  }
4591 
4593  {
4599  }
4600 
4602  {
4604  _active_connections.erase(peer);
4608  }
4609 
4611  {
4613  _active_connections.erase(peer);
4617  }
4618 
4620  {
4622  ilog( "----------------- PEER STATUS UPDATE --------------------" );
4623  ilog( " number of peers: ${active} active, ${handshaking} handshaking, ${closing} closing. "
4624  " attempting to maintain ${desired} - ${maximum} peers",
4625  ( "active", _active_connections.size() )("handshaking", _handshaking_connections.size() )
4626  ( "closing", _closing_connections.size() )
4627  ( "desired", _desired_number_of_connections )("maximum", _maximum_number_of_connections ) );
4628  {
4630  for( const peer_connection_ptr& peer : _active_connections )
4631  {
4632  ilog( " active peer ${endpoint} [${direction}] (${inbound_ep} ${is_firewalled}) "
4633  "peer_is_in_sync_with_us:${in_sync_with_us} we_are_in_sync_with_peer:${in_sync_with_them}",
4634  ( "endpoint", peer->get_remote_endpoint() )
4635  ( "direction", peer->direction )
4636  ( "inbound_ep", peer->get_endpoint_for_connecting() )
4637  ( "is_firewalled", peer->is_firewalled)
4638  ( "in_sync_with_us", !peer->peer_needs_sync_items_from_us )
4639  ( "in_sync_with_them", !peer->we_need_sync_items_from_peer ) );
4640  if( peer->we_need_sync_items_from_peer )
4641  ilog( " above peer has ${count} sync items we might need",
4642  ("count", peer->ids_of_items_to_get.size() ) );
4643  if (peer->inhibit_fetching_sync_blocks)
4644  ilog( " we are not fetching sync blocks from the above peer "
4645  "(inhibit_fetching_sync_blocks == true)" );
4646 
4647  }
4648  }
4649  {
4651  for( const peer_connection_ptr& peer : _handshaking_connections )
4652  {
4653  ilog( " handshaking peer ${endpoint} [${direction}] in state ours(${our_state}) theirs(${their_state})",
4654  ( "endpoint", peer->get_remote_endpoint() )
4655  ( "direction", peer->direction )
4656  ( "our_state", peer->our_state )( "their_state", peer->their_state ) );
4657  }
4658  }
4659  ilog( "--------- MEMORY USAGE ------------" );
4660  ilog( "node._active_sync_requests size: ${size}", ("size", _active_sync_requests.size() ) );
4661  ilog( "node._received_sync_items size: ${size}", ("size", _received_sync_items.size() ) );
4662  ilog( "node._new_received_sync_items size: ${size}", ("size", _new_received_sync_items.size() ) );
4663  ilog( "node._items_to_fetch size: ${size}", ("size", _items_to_fetch.size() ) );
4664  ilog( "node._new_inventory size: ${size}", ("size", _new_inventory.size() ) );
4665  ilog( "node._message_cache size: ${size}", ("size", _message_cache.size() ) );
4667  for( const peer_connection_ptr& peer : _active_connections )
4668  {
4669  ilog( " peer ${endpoint}", ("endpoint", peer->get_remote_endpoint() ) );
4670  ilog( " peer.ids_of_items_to_get size: ${size}", ("size", peer->ids_of_items_to_get.size() ) );
4671  ilog( " peer.inventory_peer_advertised_to_us size: ${size}", ("size", peer->inventory_peer_advertised_to_us.size() ) );
4672  ilog( " peer.inventory_advertised_to_peer size: ${size}", ("size", peer->inventory_advertised_to_peer.size() ) );
4673  ilog( " peer.items_requested_from_peer size: ${size}", ("size", peer->items_requested_from_peer.size() ) );
4674  ilog( " peer.sync_items_requested_from_peer size: ${size}", ("size", peer->sync_items_requested_from_peer.size() ) );
4675  }
4676  ilog( "--------- END MEMORY USAGE ------------" );
4677  }
4678 
4680  const std::string& reason_for_disconnect,
4681  bool caused_by_error /* = false */,
4682  const fc::oexception& error /* = fc::oexception() */ )
4683  {
4685  move_peer_to_closing_list(peer_to_disconnect->shared_from_this());
4686 
4687  if (peer_to_disconnect->they_have_requested_close)
4688  {
4689  // the peer has already told us that it's ready to close the connection, so just close the connection
4690  peer_to_disconnect->close_connection();
4691  }
4692  else if( peer_to_disconnect->we_have_requested_close )
4693  {
4694  dlog( "Disconnecting again from ${peer} for ${reason}, ignore",
4695  ("peer",peer_to_disconnect->get_remote_endpoint()) ("reason",reason_for_disconnect));
4696  return;
4697  }
4698  else
4699  {
4700  // we're the first to try to want to close the connection
4701  fc::optional<fc::ip::endpoint> inbound_endpoint = peer_to_disconnect->get_endpoint_for_connecting();
4702  if( inbound_endpoint.valid() && inbound_endpoint->port() != 0 )
4703  {
4704  fc::optional<potential_peer_record> updated_peer_record
4705  = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint);
4706  if (updated_peer_record)
4707  {
4708  updated_peer_record->last_seen_time = fc::time_point::now();
4709  if (error)
4710  updated_peer_record->last_error = error;
4711  else
4712  updated_peer_record->last_error = fc::exception(FC_LOG_MESSAGE(info, reason_for_disconnect.c_str()));
4713  _potential_peer_db.update_entry(*updated_peer_record);
4714  }
4715  }
4716  peer_to_disconnect->we_have_requested_close = true;
4717  peer_to_disconnect->connection_closed_time = fc::time_point::now();
4718 
4719  closing_connection_message closing_message( reason_for_disconnect, caused_by_error, error );
4720  peer_to_disconnect->send_message( closing_message );
4721  }
4722 
4723  // notify the user. This will be useful in testing, but we might want to remove it later.
4724  // It makes good sense to notify the user if other nodes think she is behaving badly, but
4725  // if we're just detecting and dissconnecting other badly-behaving nodes, they don't really care.
4726  if (caused_by_error)
4727  {
4728  std::ostringstream error_message;
4729  error_message << "I am disconnecting peer " << fc::variant( peer_to_disconnect->get_remote_endpoint(), GRAPHENE_NET_MAX_NESTED_OBJECTS ).as_string() <<
4730  " for reason: " << reason_for_disconnect;
4731  _delegate->error_encountered(error_message.str(), fc::oexception());
4732  dlog(error_message.str());
4733  }
4734  else
4735  dlog("Disconnecting from ${peer} for ${reason}", ("peer",peer_to_disconnect->get_remote_endpoint()) ("reason",reason_for_disconnect));
4736  }
4737 
4738  void node_impl::set_listen_endpoint( const fc::ip::endpoint& ep, bool wait_if_not_available )
4739  {
4742  _node_configuration.wait_if_endpoint_is_busy = wait_if_not_available;
4744  }
4745 
4747  {
4751  }
4752 
4754  {
4758  }
4759 
4761  {
4765  }
4766 
4768  {
4771  }
4772 
4773  std::vector<peer_status> node_impl::get_connected_peers() const
4774  {
4776  std::vector<peer_status> statuses;
4778  for (const peer_connection_ptr& peer : _active_connections)
4779  {
4780  peer_status this_peer_status;
4781  this_peer_status.version = 0;
4782  fc::optional<fc::ip::endpoint> endpoint = peer->get_remote_endpoint();
4783  if (endpoint)
4784  this_peer_status.host = *endpoint;
4785  fc::mutable_variant_object peer_details;
4786  peer_details["addr"] = endpoint ? (std::string)*endpoint : std::string();
4787  peer_details["addrlocal"] = (std::string)peer->get_local_endpoint();
4788  peer_details["services"] = "00000001";
4789  peer_details["lastsend"] = peer->get_last_message_sent_time().sec_since_epoch();
4790  peer_details["lastrecv"] = peer->get_last_message_received_time().sec_since_epoch();
4791  peer_details["bytessent"] = peer->get_total_bytes_sent();
4792  peer_details["bytesrecv"] = peer->get_total_bytes_received();
4793  peer_details["conntime"] = peer->get_connection_time();
4794  peer_details["pingtime"] = "";
4795  peer_details["pingwait"] = "";
4796  peer_details["version"] = "";
4797  peer_details["subver"] = peer->user_agent;
4798  peer_details["inbound"] = peer->direction == peer_connection_direction::inbound;
4799  peer_details["firewall_status"] = fc::variant( peer->is_firewalled, 1 );
4800  peer_details["startingheight"] = "";
4801  peer_details["banscore"] = "";
4802  peer_details["syncnode"] = "";
4803 
4804  if (peer->fc_git_revision_sha)
4805  {
4806  std::string revision_string = *peer->fc_git_revision_sha;
4807  if (*peer->fc_git_revision_sha == fc::git_revision_sha)
4808  revision_string += " (same as ours)";
4809  else
4810  revision_string += " (different from ours)";
4811  peer_details["fc_git_revision_sha"] = revision_string;
4812 
4813  }
4814  if (peer->fc_git_revision_unix_timestamp)
4815  {
4816  peer_details["fc_git_revision_unix_timestamp"] = *peer->fc_git_revision_unix_timestamp;
4817  std::string age_string = fc::get_approximate_relative_time_string( *peer->fc_git_revision_unix_timestamp);
4818  if (*peer->fc_git_revision_unix_timestamp == fc::time_point_sec(fc::git_revision_unix_timestamp))
4819  age_string += " (same as ours)";
4820  else if (*peer->fc_git_revision_unix_timestamp > fc::time_point_sec(fc::git_revision_unix_timestamp))
4821  age_string += " (newer than ours)";
4822  else
4823  age_string += " (older than ours)";
4824  peer_details["fc_git_revision_age"] = age_string;
4825  }
4826 
4827  if (peer->platform)
4828  peer_details["platform"] = *peer->platform;
4829 
4830  // provide these for debugging
4831  // warning: these are just approximations, if the peer is "downstream" of us, they may
4832  // have received blocks from other peers that we are unaware of
4833  peer_details["current_head_block"] = fc::variant( peer->last_block_delegate_has_seen, 1 );
4834  peer_details["current_head_block_number"] = _delegate->get_block_number(peer->last_block_delegate_has_seen);
4835  peer_details["current_head_block_time"] = peer->last_block_time_delegate_has_seen;
4836 
4837  peer_details["peer_needs_sync_items_from_us"] = peer->peer_needs_sync_items_from_us;
4838  peer_details["we_need_sync_items_from_peer"] = peer->we_need_sync_items_from_peer;
4839 
4840  this_peer_status.info = peer_details;
4841  statuses.push_back(this_peer_status);
4842  }
4843  return statuses;
4844  }
4845 
4847  {
4849  return (uint32_t)_active_connections.size();
4850  }
4851 
4852  void node_impl::broadcast( const message& item_to_broadcast, const message_propagation_data& propagation_data )
4853  {
4855  message_hash_type hash_of_message_contents;
4856  if( item_to_broadcast.msg_type.value() == graphene::net::block_message_type )
4857  {
4858  graphene::net::block_message block_message_to_broadcast = item_to_broadcast.as<graphene::net::block_message>();
4859  hash_of_message_contents = block_message_to_broadcast.block_id; // for debugging
4860  _most_recent_blocks_accepted.push_back( block_message_to_broadcast.block_id );
4861  }
4862  else if( item_to_broadcast.msg_type.value() == graphene::net::trx_message_type )
4863  {
4864  graphene::net::trx_message transaction_message_to_broadcast = item_to_broadcast.as<graphene::net::trx_message>();
4865  hash_of_message_contents = transaction_message_to_broadcast.trx.id(); // for debugging
4866  dlog( "broadcasting trx: ${trx}", ("trx", transaction_message_to_broadcast) );
4867  }
4868  message_hash_type hash_of_item_to_broadcast = item_to_broadcast.id();
4869 
4870  _message_cache.cache_message( item_to_broadcast, hash_of_item_to_broadcast, propagation_data, hash_of_message_contents );
4871  _new_inventory.insert( item_id(item_to_broadcast.msg_type.value(), hash_of_item_to_broadcast ) );
4873  }
4874 
4875  void node_impl::broadcast( const message& item_to_broadcast )
4876  {
4878  // this version is called directly from the client
4880  broadcast( item_to_broadcast, propagation_data );
4881  }
4882 
4883  void node_impl::sync_from(const item_id& current_head_block, const std::vector<uint32_t>& hard_fork_block_numbers)
4884  {
4887  _sync_item_type = current_head_block.item_type;
4888  _most_recent_blocks_accepted.push_back(current_head_block.item_hash);
4889  _hard_fork_block_numbers = hard_fork_block_numbers;
4890  }
4891 
4893  {
4895  return !_active_connections.empty();
4896  }
4897 
4898  std::vector<potential_peer_record> node_impl::get_potential_peers() const
4899  {
4901  std::vector<potential_peer_record> result;
4902  // use explicit iterators here, for some reason the mac compiler can't used ranged-based for loops here
4904  result.push_back(*itr);
4905  return result;
4906  }
4907 
4909  {
4911  if (params.contains("peer_connection_retry_timeout"))