38 # undef DEFAULT_LOGGER
40 #define DEFAULT_LOGGER "p2p"
43 # define VERIFY_CORRECT_THREAD() assert(_thread->is_current())
45 # define VERIFY_CORRECT_THREAD() do {} while (0)
59 uint64_t _bytes_received;
66 std::atomic_bool _send_message_in_progress;
67 std::atomic_bool _read_loop_in_progress;
73 void start_read_loop();
101 _ready_for_sending(
fc::promise<void>::create()),
104 _send_message_in_progress(false),
105 _read_loop_in_progress(false)
107 ,_thread(&
fc::thread::current())
127 assert(!_read_loop_done.
valid());
128 _read_loop_done =
fc::async([=](){ read_loop(); },
"message read_loop");
136 assert(!_read_loop_done.
valid());
137 _read_loop_done =
fc::async([=](){ read_loop(); },
"message read_loop");
144 _sock.
bind(local_endpoint);
149 std::atomic_bool* _flag;
153 bool expected =
false;
154 FC_ASSERT( flag->compare_exchange_strong( expected,
true ),
"Only one thread at time can visit it");
162 void message_oriented_connection_impl::read_loop()
165 const int BUFFER_SIZE = 16;
167 static_assert(BUFFER_SIZE >=
sizeof(
message_header),
"insufficient buffer");
169 no_parallel_execution_guard guard( &_read_loop_in_progress );
174 bool call_on_connection_closed =
false;
175 bool io_error =
false;
180 char buffer[BUFFER_SIZE];
184 _sock.
read(buffer, BUFFER_SIZE);
185 }
catch (
const fc::canceled_exception& ) {
189 _bytes_received += BUFFER_SIZE;
190 memcpy((
char*)&m, buffer,
sizeof(message_header));
193 size_t remaining_bytes_with_padding = 16 * ((m.size.value() - LEFTOVER + 15) / 16);
194 m.data.resize(LEFTOVER + remaining_bytes_with_padding);
195 std::copy(buffer +
sizeof(message_header), buffer +
sizeof(buffer), m.data.begin());
196 if (remaining_bytes_with_padding)
199 _sock.
read(&m.data[LEFTOVER], remaining_bytes_with_padding);
200 }
catch (
const fc::canceled_exception& ) {
204 _bytes_received += remaining_bytes_with_padding;
206 m.data.resize(m.size.value());
216 catch (
const fc::canceled_exception& e ) {
throw; }
217 catch (
const fc::eof_exception& e ) {
throw; }
226 catch (
const fc::canceled_exception& e )
230 wlog(
"disconnected on io error ${e}", (
"e", e.to_detail_string() ) );
231 call_on_connection_closed =
true;
233 (
"e", e.to_detail_string())));
237 wlog(
"caught a canceled_exception in read_loop. this should mean we're in the process of deleting "
238 "this object already, so there's no need to notify the delegate: ${e}",
239 (
"e", e.to_detail_string() ) );
243 catch (
const fc::eof_exception& e )
245 wlog(
"disconnected ${e}", (
"e", e.to_detail_string() ) );
246 call_on_connection_closed =
true;
251 call_on_connection_closed =
true;
254 catch (
const std::exception& e )
256 elog(
"disconnected ${er}", (
"er", e.what() ) );
257 call_on_connection_closed =
true;
262 elog(
"unexpected exception" );
263 call_on_connection_closed =
true;
267 if (call_on_connection_closed)
270 if (exception_to_rethrow)
271 throw *exception_to_rethrow;
277 #if 0 // this gets too verbose
282 struct scope_logger {
285 ~scope_logger() {
dlog(
"leaving message_oriented_connection::send_message() for peer ${endpoint}", (
"endpoint",
endpoint)); }
286 } send_message_scope_logger(remote_endpoint);
290 _ready_for_sending->
wait();
294 size_t size_of_message_and_header =
sizeof(
message_header) + message_to_send.
size.value();
296 elog(
"Trying to send a message larger than MAX_MESSAGE_SIZE. This probably won't work...");
298 size_t size_with_padding = 16 * ((size_of_message_and_header + 15) / 16);
299 std::vector<char> padded_message( size_with_padding );
301 memcpy( padded_message.data(), (
const char*)&message_to_send,
sizeof(
message_header) );
302 memcpy( padded_message.data() +
sizeof(
message_header), message_to_send.data.data(),
303 message_to_send.size.value() );
304 char* padding_space = padded_message.data() +
sizeof(
message_header) + message_to_send.size.value();
305 memset(padding_space, 0, size_with_padding - size_of_message_and_header);
306 _sock.
write( padded_message.data(), size_with_padding );
308 _bytes_sent += size_with_padding;
326 ilog(
"in destroy_connection() for ${endpoint}", (
"endpoint", remote_endpoint) );
328 if (_send_message_in_progress)
329 elog(
"Error: message_oriented_connection is being destroyed while a send_message is in progress. "
330 "The task calling send_message() should have been canceled already");
331 assert(!_send_message_in_progress);
339 wlog(
"Exception thrown while canceling message_oriented_connection's read_loop, ignoring: ${e}", (
"e",e) );
343 wlog(
"Exception thrown while canceling message_oriented_connection's read_loop, ignoring" );
345 _ready_for_sending->
set_exception( std::make_shared<fc::canceled_exception>() );
357 return _bytes_received;
363 return _last_message_sent_time;
369 return _last_message_received_time;
382 my(
std::make_unique<detail::message_oriented_connection_impl>(this, delegate) )
392 return my->get_socket();
402 my->connect_to(remote_endpoint);
407 my->bind(local_endpoint);
412 my->send_message(message_to_send);
417 my->close_connection();
422 my->destroy_connection();
427 return my->get_total_bytes_sent();
432 return my->get_total_bytes_received();
437 return my->get_last_message_sent_time();
442 return my->get_last_message_received_time();
446 return my->get_connection_time();
450 return my->get_shared_secret();