BitShares-Core  7.0.2
BitShares blockchain node software and command-line wallet software
message_oriented_connection.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Cryptonomex, Inc., and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
24 #include <fc/thread/thread.hpp>
25 #include <fc/thread/mutex.hpp>
27 #include <fc/thread/future.hpp>
28 #include <fc/log/logger.hpp>
29 #include <fc/io/enum_type.hpp>
30 
33 #include <graphene/net/config.hpp>
34 
35 #include <atomic>
36 
37 #ifdef DEFAULT_LOGGER
38 # undef DEFAULT_LOGGER
39 #endif
40 #define DEFAULT_LOGGER "p2p"
41 
42 #ifndef NDEBUG
43 # define VERIFY_CORRECT_THREAD() assert(_thread->is_current())
44 #else
45 # define VERIFY_CORRECT_THREAD() do {} while (0)
46 #endif
47 
48 namespace graphene { namespace net {
49  namespace detail
50  {
52  {
53  private:
56  stcp_socket _sock;
57  fc::promise<void>::ptr _ready_for_sending;
58  fc::future<void> _read_loop_done;
59  uint64_t _bytes_received;
60  uint64_t _bytes_sent;
61 
62  fc::time_point _connected_time;
63  fc::time_point _last_message_received_time;
64  fc::time_point _last_message_sent_time;
65 
66  std::atomic_bool _send_message_in_progress;
67  std::atomic_bool _read_loop_in_progress;
68 #ifndef NDEBUG
69  fc::thread* _thread;
70 #endif
71 
72  void read_loop();
73  void start_read_loop();
74  public:
76  void accept();
77  void connect_to(const fc::ip::endpoint& remote_endpoint);
78  void bind(const fc::ip::endpoint& local_endpoint);
79 
81  message_oriented_connection_delegate* delegate = nullptr);
83 
84  void send_message(const message& message_to_send);
85  void close_connection();
86  void destroy_connection();
87 
88  uint64_t get_total_bytes_sent() const;
89  uint64_t get_total_bytes_received() const;
90 
93  fc::time_point get_connection_time() const { return _connected_time; }
95  };
96 
99  : _self(self),
100  _delegate(delegate),
101  _ready_for_sending(fc::promise<void>::create()),
102  _bytes_received(0),
103  _bytes_sent(0),
104  _send_message_in_progress(false),
105  _read_loop_in_progress(false)
106 #ifndef NDEBUG
107  ,_thread(&fc::thread::current())
108 #endif
109  {
110  }
112  {
115  }
116 
118  {
120  return _sock.get_socket();
121  }
122 
124  {
126  _sock.accept();
127  assert(!_read_loop_done.valid()); // check to be sure we never launch two read loops
128  _read_loop_done = fc::async([=](){ read_loop(); }, "message read_loop");
129  _ready_for_sending->set_value();
130  }
131 
133  {
135  _sock.connect_to(remote_endpoint);
136  assert(!_read_loop_done.valid()); // check to be sure we never launch two read loops
137  _read_loop_done = fc::async([=](){ read_loop(); }, "message read_loop");
138  _ready_for_sending->set_value();
139  }
140 
142  {
144  _sock.bind(local_endpoint);
145  }
146 
148  {
149  std::atomic_bool* _flag;
150  public:
151  explicit no_parallel_execution_guard(std::atomic_bool* flag) : _flag(flag)
152  {
153  bool expected = false;
154  FC_ASSERT( flag->compare_exchange_strong( expected, true ), "Only one thread at time can visit it");
155  }
157  {
158  *_flag = false;
159  }
160  };
161 
162  void message_oriented_connection_impl::read_loop()
163  {
165  const int BUFFER_SIZE = 16;
166  const int LEFTOVER = BUFFER_SIZE - sizeof(message_header);
167  static_assert(BUFFER_SIZE >= sizeof(message_header), "insufficient buffer");
168 
169  no_parallel_execution_guard guard( &_read_loop_in_progress );
170 
171  _connected_time = fc::time_point::now();
172 
173  fc::oexception exception_to_rethrow;
174  bool call_on_connection_closed = false;
175  bool io_error = false;
176 
177  try
178  {
179  message m;
180  char buffer[BUFFER_SIZE];
181  while( true )
182  {
183  try {
184  _sock.read(buffer, BUFFER_SIZE);
185  } catch ( const fc::canceled_exception& ) {
186  io_error = true;
187  throw;
188  }
189  _bytes_received += BUFFER_SIZE;
190  memcpy((char*)&m, buffer, sizeof(message_header));
191  FC_ASSERT( m.size.value() <= MAX_MESSAGE_SIZE, "", ("m.size",m.size.value())("MAX_MESSAGE_SIZE",MAX_MESSAGE_SIZE) );
192 
193  size_t remaining_bytes_with_padding = 16 * ((m.size.value() - LEFTOVER + 15) / 16);
194  m.data.resize(LEFTOVER + remaining_bytes_with_padding); //give extra 16 bytes to allow for padding added in send call
195  std::copy(buffer + sizeof(message_header), buffer + sizeof(buffer), m.data.begin());
196  if (remaining_bytes_with_padding)
197  {
198  try {
199  _sock.read(&m.data[LEFTOVER], remaining_bytes_with_padding);
200  } catch ( const fc::canceled_exception& ) {
201  io_error = true;
202  throw;
203  }
204  _bytes_received += remaining_bytes_with_padding;
205  }
206  m.data.resize(m.size.value()); // truncate off the padding bytes
207 
208  _last_message_received_time = fc::time_point::now();
209 
210  try
211  {
212  // message handling errors are warnings...
213  _delegate->on_message(_self, m);
214  }
216  catch ( const fc::canceled_exception& e ) { throw; }
217  catch ( const fc::eof_exception& e ) { throw; }
218  catch ( const fc::exception& e)
219  {
221  wlog( "message transmission failed ${er}", ("er", e.to_detail_string() ) );
222  throw;
223  }
224  }
225  }
226  catch ( const fc::canceled_exception& e )
227  {
228  if( io_error )
229  {
230  wlog( "disconnected on io error ${e}", ("e", e.to_detail_string() ) );
231  call_on_connection_closed = true;
232  exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected on io error: ${e}",
233  ("e", e.to_detail_string())));
234  }
235  else
236  {
237  wlog( "caught a canceled_exception in read_loop. this should mean we're in the process of deleting "
238  "this object already, so there's no need to notify the delegate: ${e}",
239  ("e", e.to_detail_string() ) );
240  throw;
241  }
242  }
243  catch ( const fc::eof_exception& e )
244  {
245  wlog( "disconnected ${e}", ("e", e.to_detail_string() ) );
246  call_on_connection_closed = true;
247  }
248  catch ( const fc::exception& e )
249  {
250  elog( "disconnected ${er}", ("er", e.to_detail_string() ) );
251  call_on_connection_closed = true;
252  exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected: ${e}", ("e", e.to_detail_string())));
253  }
254  catch ( const std::exception& e )
255  {
256  elog( "disconnected ${er}", ("er", e.what() ) );
257  call_on_connection_closed = true;
258  exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected: ${e}", ("e", e.what())));
259  }
260  catch ( ... )
261  {
262  elog( "unexpected exception" );
263  call_on_connection_closed = true;
264  exception_to_rethrow = fc::unhandled_exception(FC_LOG_MESSAGE(warn, "disconnected: ${e}", ("e", fc::except_str())));
265  }
266 
267  if (call_on_connection_closed)
268  _delegate->on_connection_closed(_self);
269 
270  if (exception_to_rethrow)
271  throw *exception_to_rethrow;
272  }
273 
275  {
277 #if 0 // this gets too verbose
278 #ifndef NDEBUG
279  fc::optional<fc::ip::endpoint> remote_endpoint;
280  if (_sock.get_socket().is_open())
281  remote_endpoint = _sock.get_socket().remote_endpoint();
282  struct scope_logger {
284  scope_logger(const fc::optional<fc::ip::endpoint>& endpoint) : endpoint(endpoint) { dlog("entering message_oriented_connection::send_message() for peer ${endpoint}", ("endpoint", endpoint)); }
285  ~scope_logger() { dlog("leaving message_oriented_connection::send_message() for peer ${endpoint}", ("endpoint", endpoint)); }
286  } send_message_scope_logger(remote_endpoint);
287 #endif
288 #endif
289  no_parallel_execution_guard guard( &_send_message_in_progress );
290  _ready_for_sending->wait();
291 
292  try
293  {
294  size_t size_of_message_and_header = sizeof(message_header) + message_to_send.size.value();
295  if( message_to_send.size.value() > MAX_MESSAGE_SIZE )
296  elog("Trying to send a message larger than MAX_MESSAGE_SIZE. This probably won't work...");
297  //pad the message we send to a multiple of 16 bytes
298  size_t size_with_padding = 16 * ((size_of_message_and_header + 15) / 16);
299  std::vector<char> padded_message( size_with_padding );
300 
301  memcpy( padded_message.data(), (const char*)&message_to_send, sizeof(message_header) );
302  memcpy( padded_message.data() + sizeof(message_header), message_to_send.data.data(),
303  message_to_send.size.value() );
304  char* padding_space = padded_message.data() + sizeof(message_header) + message_to_send.size.value();
305  memset(padding_space, 0, size_with_padding - size_of_message_and_header);
306  _sock.write( padded_message.data(), size_with_padding );
307  _sock.flush();
308  _bytes_sent += size_with_padding;
309  _last_message_sent_time = fc::time_point::now();
310  } FC_RETHROW_EXCEPTIONS( warn, "unable to send message" )
311  }
312 
314  {
316  _sock.close();
317  }
318 
320  {
322 
323  fc::optional<fc::ip::endpoint> remote_endpoint;
324  if (_sock.get_socket().is_open())
325  remote_endpoint = _sock.get_socket().remote_endpoint();
326  ilog( "in destroy_connection() for ${endpoint}", ("endpoint", remote_endpoint) );
327 
328  if (_send_message_in_progress)
329  elog("Error: message_oriented_connection is being destroyed while a send_message is in progress. "
330  "The task calling send_message() should have been canceled already");
331  assert(!_send_message_in_progress);
332 
333  try
334  {
335  _read_loop_done.cancel_and_wait(__FUNCTION__);
336  }
337  catch ( const fc::exception& e )
338  {
339  wlog( "Exception thrown while canceling message_oriented_connection's read_loop, ignoring: ${e}", ("e",e) );
340  }
341  catch (...)
342  {
343  wlog( "Exception thrown while canceling message_oriented_connection's read_loop, ignoring" );
344  }
345  _ready_for_sending->set_exception( std::make_shared<fc::canceled_exception>() );
346  }
347 
349  {
351  return _bytes_sent;
352  }
353 
355  {
357  return _bytes_received;
358  }
359 
361  {
363  return _last_message_sent_time;
364  }
365 
367  {
369  return _last_message_received_time;
370  }
371 
373  {
375  return _sock.get_shared_secret();
376  }
377 
378  } // end namespace graphene::net::detail
379 
380 
382  my( std::make_unique<detail::message_oriented_connection_impl>(this, delegate) )
383  {
384  }
385 
387  {
388  }
389 
391  {
392  return my->get_socket();
393  }
394 
396  {
397  my->accept();
398  }
399 
401  {
402  my->connect_to(remote_endpoint);
403  }
404 
406  {
407  my->bind(local_endpoint);
408  }
409 
411  {
412  my->send_message(message_to_send);
413  }
414 
416  {
417  my->close_connection();
418  }
419 
421  {
422  my->destroy_connection();
423  }
424 
426  {
427  return my->get_total_bytes_sent();
428  }
429 
431  {
432  return my->get_total_bytes_received();
433  }
434 
436  {
437  return my->get_last_message_sent_time();
438  }
439 
441  {
442  return my->get_last_message_received_time();
443  }
445  {
446  return my->get_connection_time();
447  }
449  {
450  return my->get_shared_secret();
451  }
452 
453 } } // end namespace graphene::net
fc::promise
Definition: future.hpp:109
fc::copy
void copy(const path &from, const path &to)
Definition: filesystem.cpp:241
graphene::net::stcp_socket::flush
virtual void flush()
Definition: stcp_socket.cpp:166
wlog
#define wlog(FORMAT,...)
Definition: logger.hpp:123
graphene::net::stcp_socket::close
virtual void close()
Definition: stcp_socket.cpp:172
fc::tcp_socket::is_open
bool is_open() const
Definition: tcp_socket.cpp:94
fc::exception
Used to generate a useful error report when an exception is thrown.
Definition: exception.hpp:56
graphene::net::message_oriented_connection::get_total_bytes_sent
uint64_t get_total_bytes_sent() const
Definition: message_oriented_connection.cpp:425
fc::promise< void >::set_value
void set_value()
Definition: future.hpp:181
graphene::net::detail::message_oriented_connection_impl::bind
void bind(const fc::ip::endpoint &local_endpoint)
Definition: message_oriented_connection.cpp:141
graphene::net::message_oriented_connection_delegate
Definition: message_oriented_connection.hpp:35
graphene::net::message_oriented_connection::connect_to
void connect_to(const fc::ip::endpoint &remote_endpoint)
Definition: message_oriented_connection.cpp:400
fc
Definition: api.hpp:15
graphene::net::detail::message_oriented_connection_impl::get_total_bytes_received
uint64_t get_total_bytes_received() const
Definition: message_oriented_connection.cpp:354
graphene::net::message
Definition: message.hpp:58
scoped_lock.hpp
graphene::net::message_oriented_connection
Definition: message_oriented_connection.hpp:43
graphene::net::detail::message_oriented_connection_impl::send_message
void send_message(const message &message_to_send)
Definition: message_oriented_connection.cpp:274
graphene::net::stcp_socket
Definition: stcp_socket.hpp:35
graphene::net::detail::message_oriented_connection_impl::get_last_message_sent_time
fc::time_point get_last_message_sent_time() const
Definition: message_oriented_connection.cpp:360
graphene::net::message_oriented_connection::~message_oriented_connection
~message_oriented_connection()
Definition: message_oriented_connection.cpp:386
graphene::net::message_oriented_connection::get_socket
fc::tcp_socket & get_socket()
Definition: message_oriented_connection.cpp:390
graphene::net::detail::message_oriented_connection_impl::~message_oriented_connection_impl
~message_oriented_connection_impl()
Definition: message_oriented_connection.cpp:111
graphene::net::detail::message_oriented_connection_impl::get_last_message_received_time
fc::time_point get_last_message_received_time() const
Definition: message_oriented_connection.cpp:366
graphene::net::detail::message_oriented_connection_impl
Definition: message_oriented_connection.cpp:51
fc::asio::tcp::endpoint
boost::asio::ip::tcp::endpoint endpoint
Definition: asio.hpp:239
graphene::net::stcp_socket::get_shared_secret
fc::sha512 get_shared_secret() const
Definition: stcp_socket.hpp:58
fc::ip::endpoint
Definition: ip.hpp:65
fc::sha512
Definition: sha512.hpp:9
graphene::net::message_oriented_connection::get_last_message_sent_time
fc::time_point get_last_message_sent_time() const
Definition: message_oriented_connection.cpp:435
graphene::net::message_oriented_connection::get_last_message_received_time
fc::time_point get_last_message_received_time() const
Definition: message_oriented_connection.cpp:440
graphene::net::detail::no_parallel_execution_guard::~no_parallel_execution_guard
~no_parallel_execution_guard()
Definition: message_oriented_connection.cpp:156
graphene::net::message_oriented_connection::bind
void bind(const fc::ip::endpoint &local_endpoint)
Definition: message_oriented_connection.cpp:405
ilog
#define ilog(FORMAT,...)
Definition: logger.hpp:117
fc::promise< void >::wait
void wait(const microseconds &timeout=microseconds::maximum())
Definition: future.hpp:174
dlog
#define dlog(FORMAT,...)
Definition: logger.hpp:100
graphene::net::detail::no_parallel_execution_guard
Definition: message_oriented_connection.cpp:147
fc::thread
Definition: thread.hpp:39
thread.hpp
fc::async
auto async(Functor &&f, const char *desc FC_TASK_NAME_DEFAULT_ARG, priority prio=priority()) -> fc::future< decltype(f())>
Definition: thread.hpp:227
fc::istream::read
istream & read(char *buf, size_t len)
Definition: iostream.cpp:274
graphene::net::detail::message_oriented_connection_impl::get_connection_time
fc::time_point get_connection_time() const
Definition: message_oriented_connection.cpp:93
fc::tcp_socket
Definition: tcp_socket.hpp:14
fc::exception::to_detail_string
std::string to_detail_string(log_level ll=log_level::all) const
Definition: exception.cpp:183
fc::future< void >::valid
bool valid() const
Definition: future.hpp:311
fc::future< void >::cancel_and_wait
void cancel_and_wait(const char *reason FC_CANCELATION_REASON_DEFAULT_ARG)
Definition: future.hpp:314
graphene::net::detail::message_oriented_connection_impl::message_oriented_connection_impl
message_oriented_connection_impl(message_oriented_connection *self, message_oriented_connection_delegate *delegate=nullptr)
Definition: message_oriented_connection.cpp:97
graphene::net::detail::message_oriented_connection_impl::connect_to
void connect_to(const fc::ip::endpoint &remote_endpoint)
Definition: message_oriented_connection.cpp:132
fc::tcp_socket::remote_endpoint
fc::ip::endpoint remote_endpoint() const
Definition: tcp_socket.cpp:126
VERIFY_CORRECT_THREAD
#define VERIFY_CORRECT_THREAD()
Definition: message_oriented_connection.cpp:43
message_oriented_connection.hpp
fc::time_point::now
static time_point now()
Definition: time.cpp:13
FC_ASSERT
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
Definition: exception.hpp:345
fc::unhandled_exception
re-thrown whenever an unhandled exception is caught.
Definition: exception.hpp:146
graphene::net::detail::message_oriented_connection_impl::get_shared_secret
fc::sha512 get_shared_secret() const
Definition: message_oriented_connection.cpp:372
future.hpp
fc::time_point
Definition: time.hpp:44
graphene::net::message_oriented_connection::get_shared_secret
fc::sha512 get_shared_secret() const
Definition: message_oriented_connection.cpp:448
graphene::net::message_header::size
boost::endian::little_uint32_buf_t size
Definition: message.hpp:43
std
Definition: zeroed_array.hpp:76
fc::promise_base::set_exception
void set_exception(const fc::exception_ptr &e)
Definition: future.cpp:44
FC_RETHROW_EXCEPTIONS
#define FC_RETHROW_EXCEPTIONS(LOG_LEVEL, FORMAT,...)
Catchs all exception's, std::exceptions, and ... and rethrows them after appending the provided log m...
Definition: exception.hpp:464
stcp_socket.hpp
enum_type.hpp
logger.hpp
graphene::net::stcp_socket::accept
void accept()
Definition: stcp_socket.cpp:180
graphene::net::message_oriented_connection_delegate::on_connection_closed
virtual void on_connection_closed(message_oriented_connection *originating_connection)=0
graphene::net::detail::no_parallel_execution_guard::no_parallel_execution_guard
no_parallel_execution_guard(std::atomic_bool *flag)
Definition: message_oriented_connection.cpp:151
MAX_MESSAGE_SIZE
#define MAX_MESSAGE_SIZE
Definition: config.hpp:41
graphene::net::stcp_socket::get_socket
fc::tcp_socket & get_socket()
Definition: stcp_socket.hpp:40
graphene::net::stcp_socket::connect_to
void connect_to(const fc::ip::endpoint &remote_endpoint)
Definition: stcp_socket.cpp:72
graphene::net::message_oriented_connection::destroy_connection
void destroy_connection()
Definition: message_oriented_connection.cpp:420
fc::optional
provides stack-based nullable value similar to boost::optional
Definition: optional.hpp:20
fc::except_str
std::string except_str()
Definition: exception.cpp:272
graphene::net::message_oriented_connection::send_message
void send_message(const message &message_to_send)
Definition: message_oriented_connection.cpp:410
fc::ostream::write
ostream & write(const char *buf, size_t len)
Definition: iostream.cpp:290
graphene::net::message_oriented_connection::accept
void accept()
Definition: message_oriented_connection.cpp:395
graphene::net::detail::message_oriented_connection_impl::get_socket
fc::tcp_socket & get_socket()
Definition: message_oriented_connection.cpp:117
graphene::net::message_oriented_connection::message_oriented_connection
message_oriented_connection(message_oriented_connection_delegate *delegate=nullptr)
Definition: message_oriented_connection.cpp:381
graphene::net::message_oriented_connection::close_connection
void close_connection()
Definition: message_oriented_connection.cpp:415
graphene::net::detail::message_oriented_connection_impl::accept
void accept()
Definition: message_oriented_connection.cpp:123
config.hpp
graphene::net::message_oriented_connection::get_total_bytes_received
uint64_t get_total_bytes_received() const
Definition: message_oriented_connection.cpp:430
graphene::net::detail::message_oriented_connection_impl::close_connection
void close_connection()
Definition: message_oriented_connection.cpp:313
graphene::net::message_oriented_connection_delegate::on_message
virtual void on_message(message_oriented_connection *originating_connection, const message &received_message)=0
graphene
Definition: api.cpp:48
fc::future< void >
Definition: future.hpp:283
graphene::net::detail::message_oriented_connection_impl::get_total_bytes_sent
uint64_t get_total_bytes_sent() const
Definition: message_oriented_connection.cpp:348
FC_LOG_MESSAGE
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
Definition: log_message.hpp:163
graphene::net::message_header
Definition: message.hpp:41
mutex.hpp
graphene::net::detail::message_oriented_connection_impl::destroy_connection
void destroy_connection()
Definition: message_oriented_connection.cpp:319
elog
#define elog(FORMAT,...)
Definition: logger.hpp:129
graphene::net::message_oriented_connection::get_connection_time
fc::time_point get_connection_time() const
Definition: message_oriented_connection.cpp:444
graphene::net::stcp_socket::bind
void bind(const fc::ip::endpoint &local_endpoint)
Definition: stcp_socket.cpp:78