32 #include <boost/scope_exit.hpp>
35 # undef DEFAULT_LOGGER
37 #define DEFAULT_LOGGER "p2p"
40 # define VERIFY_CORRECT_THREAD() assert(_thread->is_current())
42 # define VERIFY_CORRECT_THREAD() do {} while (0)
47 message peer_connection::real_queued_message::get_message(peer_connection_delegate*)
49 if (message_send_time_field_offset != (
size_t)-1)
54 assert(message_send_time_field_offset + packed_current_time.size() <= message_to_send.data.size());
55 memcpy(message_to_send.data.data() + message_send_time_field_offset,
56 packed_current_time.data(), packed_current_time.size());
58 return message_to_send;
60 size_t peer_connection::real_queued_message::get_size_in_queue()
62 return message_to_send.data.size();
64 message peer_connection::virtual_queued_message::get_message(peer_connection_delegate* node)
66 return node->get_message_for_item(item_to_send);
69 size_t peer_connection::virtual_queued_message::get_size_in_queue()
71 return sizeof(item_id);
76 _message_connection(this),
77 _total_queued_messages_size(0),
80 they_have_requested_close(false),
82 we_have_requested_close(false),
84 number_of_unfetched_item_ids(0),
85 peer_needs_sync_items_from_us(true),
86 we_need_sync_items_from_peer(true),
87 inhibit_fetching_sync_blocks(false),
88 transaction_fetching_inhibited_until(
fc::time_point::min()),
89 last_known_fork_block_number(0),
91 _thread(&
fc::thread::current()),
92 _send_message_queue_tasks_running(0),
94 _currently_handling_message(false)
115 return std::make_shared<peer_connection_subclass>(delegate);
118 void peer_connection::destroy()
122 #if 0 // this gets too verbose
124 struct scope_logger {
127 ~scope_logger() {
dlog(
"leaving peer_connection::destroy() for peer ${endpoint}", (
"endpoint",
endpoint)); }
134 dlog(
"calling close_connection()");
136 dlog(
"close_connection completed normally");
138 catch (
const fc::canceled_exception& )
140 assert(
false &&
"the task that deletes peers should not be canceled because it will prevent us from cleaning up correctly");
144 dlog(
"close_connection threw");
149 dlog(
"canceling _send_queued_messages task");
151 dlog(
"cancel_and_wait completed normally");
155 wlog(
"Unexpected exception from peer_connection's send_queued_messages_task : ${e}", (
"e", e));
159 wlog(
"Unexpected exception from peer_connection's send_queued_messages_task");
164 dlog(
"canceling accept_or_connect_task");
166 dlog(
"accept_or_connect_task completed normally");
170 wlog(
"Unexpected exception from peer_connection's accept_or_connect_task : ${e}", (
"e", e));
174 wlog(
"Unexpected exception from peer_connection's accept_or_connect_task");
196 struct scope_logger {
197 scope_logger() {
dlog(
"entering peer_connection::accept_connection()"); }
198 ~scope_logger() {
dlog(
"leaving peer_connection::accept_connection()"); }
199 } accept_connection_scope_logger;
207 _message_connection.
accept();
220 ilog(
"established inbound connection from ${remote_endpoint}, sending hello",
240 _remote_endpoint = remote_endpoint;
241 bool failed_to_bind =
false;
250 _message_connection.
bind( *local_endpoint );
252 catch (
const fc::canceled_exception& )
258 failed_to_bind =
true;
259 wlog(
"Failed to bind to desired local endpoint ${endpoint}, will connect using an OS-selected "
260 "endpoint: ${except}",
261 (
"endpoint", *local_endpoint )(
"except", except ) );
268 _message_connection.
connect_to( remote_endpoint );
270 catch (
const fc::canceled_exception& )
276 if( local_endpoint && !failed_to_bind )
279 wlog(
"Failed to connect to remote endpoint ${remote_endpoint} from local endpoint ${local_endpoint}, "
280 "will connect using an OS-selected endpoint: ${except}",
281 (
"remote_endpoint", remote_endpoint )(
"local_endpoint", *local_endpoint )(
"except", except ) );
290 _message_connection.
connect_to( remote_endpoint );
296 ilog(
"established outbound connection to ${remote_endpoint}", (
"remote_endpoint", remote_endpoint ) );
300 wlog(
"error connecting to peer ${remote_endpoint}: ${e}",
307 const message& received_message )
310 _currently_handling_message =
true;
311 BOOST_SCOPE_EXIT(this_) {
312 this_->_currently_handling_message =
false;
313 } BOOST_SCOPE_EXIT_END
324 void peer_connection::send_queued_messages_task()
329 unsigned& _send_message_queue_tasks_counter;
330 counter(
unsigned& var) : _send_message_queue_tasks_counter(var) { assert(_send_message_queue_tasks_counter == 0); ++_send_message_queue_tasks_counter; }
331 ~counter() { assert(_send_message_queue_tasks_counter == 1); --_send_message_queue_tasks_counter; }
332 } concurrent_invocation_counter(_send_message_queue_tasks_running);
334 while (!_queued_messages.empty())
337 message message_to_send = _queued_messages.front()->get_message(_node);
347 catch (
const fc::canceled_exception&)
349 dlog(
"message_oriented_connection::send_message() was canceled, rethrowing canceled_exception");
354 wlog(
"Error sending message: ${exception}. Closing connection.", (
"exception", send_error));
361 wlog(
"Caught error while closing connection: ${exception}", (
"exception", close_error));
365 catch (
const std::exception& e)
367 wlog(
"message_oriented_exception::send_message() threw a std::exception(): ${what}", (
"what", e.what()));
371 wlog(
"message_oriented_exception::send_message() threw an unhandled exception");
374 _total_queued_messages_size -= _queued_messages.front()->get_size_in_queue();
375 _queued_messages.pop();
383 _total_queued_messages_size += message_to_send->get_size_in_queue();
384 _queued_messages.emplace(std::move(message_to_send));
387 wlog(
"send queue exceeded maximum size of ${max} bytes (current size ${current} bytes)",
395 wlog(
"Caught error while closing connection: ${exception}", (
"exception", e));
400 if( _send_queued_messages_done.
valid() && _send_queued_messages_done.
canceled() )
403 if (!_send_queued_messages_done.
valid() || _send_queued_messages_done.
ready())
406 _send_queued_messages_done =
fc::async([
this](){ send_queued_messages_task(); },
"send_queued_messages_task");
417 auto message_to_enqueue = std::make_unique<real_queued_message>(
418 message_to_send, message_send_time_field_offset );
427 auto message_to_enqueue = std::make_unique<virtual_queued_message>(item_to_send);
474 return _remote_endpoint;
485 _remote_endpoint = new_remote_endpoint;
503 return _currently_handling_message;
526 unsigned number_of_elements_advertised_to_peer_to_discard = std::distance(begin_iter, oldest_inventory_to_keep_iter);
532 unsigned number_of_elements_peer_advertised_to_discard = std::distance(begin_iter, oldest_inventory_to_keep_iter);
534 dlog(
"Expiring old inventory for peer ${peer}: removing ${to_peer} items advertised to peer (${remain_to_peer} left), and ${to_us} advertised to us (${remain_to_us} left)",