BitShares-Core  7.0.2
BitShares blockchain node software and command-line wallet software
asio.cpp
Go to the documentation of this file.
1 #include <fc/asio.hpp>
2 #include <fc/thread/thread.hpp>
3 #include <boost/thread.hpp>
4 #include <fc/log/logger.hpp>
6 #include <boost/scope_exit.hpp>
7 #include <algorithm>
8 #include <thread>
9 
10 namespace fc {
11  namespace asio {
12  namespace detail {
13 
15  _completion_promise(completion_promise)
16  {
17  // assert(false); // to detect anywhere we're not passing in a shared buffer
18  }
19  void read_write_handler::operator()(const boost::system::error_code& ec, size_t bytes_transferred)
20  {
21  // assert(false); // to detect anywhere we're not passing in a shared buffer
22  if( !ec )
23  _completion_promise->set_value(bytes_transferred);
24  else if( ec == boost::asio::error::eof )
25  {
26  _completion_promise->set_exception( std::make_shared<fc::eof_exception>(
27  FC_LOG_MESSAGE( error, "${message} ",
28  ("message", boost::system::system_error(ec).what())) ) );
29  }
30  else if( ec == boost::asio::error::operation_aborted )
31  {
32  _completion_promise->set_exception( std::make_shared<fc::canceled_exception>(
33  FC_LOG_MESSAGE( error, "${message} ",
34  ("message", boost::system::system_error(ec).what())) ) );
35  }
36  else
37  {
38  _completion_promise->set_exception( std::make_shared<fc::exception>(
39  FC_LOG_MESSAGE( error, "${message} ",
40  ("message", boost::system::system_error(ec).what())) ) );
41  }
42  }
44  const std::shared_ptr<const char>& buffer) :
45  _completion_promise(completion_promise),
46  _buffer(buffer)
47  {}
48  void read_write_handler_with_buffer::operator()(const boost::system::error_code& ec, size_t bytes_transferred)
49  {
50  if( !ec )
51  _completion_promise->set_value(bytes_transferred);
52  else if( ec == boost::asio::error::eof )
53  {
54  _completion_promise->set_exception( std::make_shared<fc::eof_exception>(
55  FC_LOG_MESSAGE( error, "${message} ",
56  ("message", boost::system::system_error(ec).what())) ) );
57  }
58  else if( ec == boost::asio::error::operation_aborted )
59  {
60  _completion_promise->set_exception( std::make_shared<fc::canceled_exception>(
61  FC_LOG_MESSAGE( error, "${message} ",
62  ("message", boost::system::system_error(ec).what())) ) );
63  }
64  else
65  {
66  _completion_promise->set_exception( std::make_shared<fc::exception>(
67  FC_LOG_MESSAGE( error, "${message} ",
68  ("message", boost::system::system_error(ec).what())) ) );
69  }
70  }
71 
72  void read_write_handler_ec( promise<size_t>* p, boost::system::error_code* oec,
73  const boost::system::error_code& ec, size_t bytes_transferred ) {
74  p->set_value(bytes_transferred);
75  *oec = ec;
76  }
78  const boost::system::error_code& ec ) {
79  if( !ec )
80  p->set_value();
81  else
82  {
83  if( ec == boost::asio::error::eof )
84  {
85  p->set_exception( std::make_shared<fc::eof_exception>(
86  FC_LOG_MESSAGE( error, "${message} ",
87  ("message", boost::system::system_error(ec).what())) ) );
88  }
89  else if( ec == boost::asio::error::operation_aborted )
90  {
91  p->set_exception( std::make_shared<fc::canceled_exception>(
92  FC_LOG_MESSAGE( error, "${message} ",
93  ("message", boost::system::system_error(ec).what())) ) );
94  }
95  else
96  {
97  p->set_exception( std::make_shared<fc::exception>(
98  FC_LOG_MESSAGE( error, "${message} ",
99  ("message", boost::system::system_error(ec).what())) ) );
100  }
101  }
102  }
103 
105  const boost::system::error_code& ec ) {
106  p->set_value(ec);
107  }
108 
109  template<typename EndpointType, typename IteratorType>
111  const typename promise<std::vector<EndpointType> >::ptr& p,
112  const boost::system::error_code& ec,
113  IteratorType itr) {
114  if( !ec ) {
115  std::vector<EndpointType> eps;
116  while( itr != IteratorType() ) {
117  eps.push_back(*itr);
118  ++itr;
119  }
120  p->set_value( eps );
121  } else {
122  p->set_exception(
124  FC_LOG_MESSAGE( error, "process exited with: ${message} ",
125  ("message", boost::system::system_error(ec).what())) ) ) );
126  }
127  }
128  }
129 
131 
132  /***
133  * @brief set the default number of threads for the io service
134  *
135  * Sets the number of threads for the io service. This will throw
136  * an exception if called more than once.
137  *
138  * @param num_threads the number of threads
139  */
140  void default_io_service_scope::set_num_threads(uint16_t num_threads) {
142  num_io_threads = num_threads;
143  }
144 
146 
147  /***
148  * Default constructor
149  */
151  {
152  io = new boost::asio::io_service();
153  the_work = new boost::asio::io_service::work(*io);
154 
155  if( num_io_threads == 0 )
156  {
157  // the default was not set by the configuration. Determine a good
158  // number of threads. Minimum of 8, maximum of hardware_concurrency
159  num_io_threads = std::max( boost::thread::hardware_concurrency(), 8U );
160  }
161 
162  for( uint16_t i = 0; i < num_io_threads; ++i )
163  {
164  asio_threads.push_back( new boost::thread( [i,this]()
165  {
166  fc::thread::current().set_name( "fc::asio worker #" + fc::to_string(i) );
167 
168  BOOST_SCOPE_EXIT(void)
169  {
171  }
172  BOOST_SCOPE_EXIT_END
173 
174  while (!io->stopped())
175  {
176  try
177  {
178  io->run();
179  }
180  catch (const fc::exception& e)
181  {
182  elog("Caught unhandled exception in asio service loop: ${e}", ("e", e));
183  }
184  catch (const std::exception& e)
185  {
186  elog("Caught unhandled exception in asio service loop: ${e}", ("e", e.what()));
187  }
188  catch (...)
189  {
190  elog("Caught unhandled exception in asio service loop");
191  }
192  }
193  }) );
194  } // build thread loop
195  } // end of constructor
196 
197  /***
198  * destructor
199  */
201  {
202  delete the_work;
203  io->stop();
204  for( auto asio_thread : asio_threads )
205  {
206  asio_thread->join();
207  }
208  delete io;
209  for( auto asio_thread : asio_threads )
210  {
211  delete asio_thread;
212  }
213  } // end of destructor
214 
215  /***
216  * @brief create an io_service
217  * @returns the io_service
218  */
219  boost::asio::io_service& default_io_service() {
220  static default_io_service_scope fc_asio_service[1];
221  return *fc_asio_service[0].io;
222  }
223 
224  namespace tcp {
225  std::vector<boost::asio::ip::tcp::endpoint> resolve( const std::string& hostname, const std::string& port)
226  {
227  try
228  {
231  res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port),
232  boost::bind( detail::resolve_handler<boost::asio::ip::tcp::endpoint,resolver_iterator>, p, _1, _2 ) );
233  return p->wait();
234  }
235  FC_RETHROW_EXCEPTIONS(warn, "")
236  }
237  }
238  namespace udp {
239  std::vector<udp::endpoint> resolve( resolver& r, const std::string& hostname, const std::string& port)
240  {
241  try
242  {
244  promise<std::vector<endpoint> >::ptr p = promise<std::vector<endpoint> >::create("udp::resolve completion");
245  res.async_resolve( resolver::query(hostname,port),
246  boost::bind( detail::resolve_handler<endpoint,resolver_iterator>, p, _1, _2 ) );
247  return p->wait();
248  }
249  FC_RETHROW_EXCEPTIONS(warn, "")
250  }
251  }
252 
253 } } // namespace fc::asio
fc::promise
Definition: future.hpp:109
fc::thread::current
static thread & current()
Definition: thread.cpp:125
fc::asio::detail::read_write_handler_with_buffer::read_write_handler_with_buffer
read_write_handler_with_buffer(const promise< size_t >::ptr &p, const std::shared_ptr< const char > &buffer)
Definition: asio.cpp:43
fc::asio::detail::read_write_handler_with_buffer::operator()
void operator()(const boost::system::error_code &ec, size_t bytes_transferred)
Definition: asio.cpp:48
fc::promise::ptr
std::shared_ptr< promise< T > > ptr
Definition: future.hpp:111
fc::thread::set_name
void set_name(const string &n)
associates a name with this thread.
Definition: thread.cpp:143
fc::asio::detail::resolve_handler
void resolve_handler(const typename promise< std::vector< EndpointType > >::ptr &p, const boost::system::error_code &ec, IteratorType itr)
Definition: asio.cpp:110
fc::exception
Used to generate a useful error report when an exception is thrown.
Definition: exception.hpp:56
fc::to_string
std::string to_string(double)
Definition: string.cpp:73
fc::promise::wait
const T & wait(const microseconds &timeout=microseconds::maximum())
Definition: future.hpp:127
fc
Definition: api.hpp:15
fc::asio::detail::error_handler_ec
void error_handler_ec(promise< boost::system::error_code > *p, const boost::system::error_code &ec)
Definition: asio.cpp:104
fc::asio::tcp::resolver
boost::asio::ip::tcp::resolver resolver
Definition: asio.hpp:241
fc::asio::default_io_service_scope
Definition: asio.hpp:74
fc::exception_ptr
std::shared_ptr< exception > exception_ptr
Definition: exception.hpp:131
fc::asio::default_io_service_scope::~default_io_service_scope
~default_io_service_scope()
Definition: asio.cpp:200
asio.hpp
fc::asio::default_io_service_scope::get_num_threads
static uint16_t get_num_threads()
Definition: asio.cpp:145
thread.hpp
fc::asio::default_io_service_scope::default_io_service_scope
default_io_service_scope()
Definition: asio.cpp:150
fc::asio::detail::read_write_handler_ec
void read_write_handler_ec(promise< size_t > *p, boost::system::error_code *oec, const boost::system::error_code &ec, size_t bytes_transferred)
Definition: asio.cpp:72
fc::promise::set_value
void set_value(const T &v)
Definition: future.hpp:136
FC_ASSERT
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
Definition: exception.hpp:345
exception.hpp
Defines exception's used by fc.
fc::asio::detail::read_write_handler::read_write_handler
read_write_handler(const promise< size_t >::ptr &p)
Definition: asio.cpp:14
fc::asio::default_io_service_scope::num_io_threads
static uint16_t num_io_threads
Definition: asio.hpp:89
fc::asio::detail::error_handler
void error_handler(const promise< void >::ptr &p, const boost::system::error_code &ec)
Definition: asio.cpp:77
fc::promise_base::set_exception
void set_exception(const fc::exception_ptr &e)
Definition: future.cpp:44
FC_RETHROW_EXCEPTIONS
#define FC_RETHROW_EXCEPTIONS(LOG_LEVEL, FORMAT,...)
Catchs all exception's, std::exceptions, and ... and rethrows them after appending the provided log m...
Definition: exception.hpp:464
logger.hpp
fc::asio::detail::read_write_handler::operator()
void operator()(const boost::system::error_code &ec, size_t bytes_transferred)
Definition: asio.cpp:19
fc::asio::default_io_service
boost::asio::io_service & default_io_service()
Definition: asio.cpp:219
fc::thread::cleanup
static void cleanup()
Definition: thread.cpp:131
fc::asio::udp::resolve
std::vector< endpoint > resolve(resolver &r, const std::string &hostname, const std::string &port)
resolve all udp::endpoints for hostname:port
Definition: asio.cpp:239
fc::asio::default_io_service_scope::set_num_threads
static void set_num_threads(uint16_t num_threads)
Definition: asio.cpp:140
fc::asio::default_io_service_scope::io
boost::asio::io_service * io
Definition: asio.hpp:84
fc::asio::tcp::resolve
std::vector< endpoint > resolve(const std::string &hostname, const std::string &port)
Definition: asio.cpp:225
fc::asio::udp::resolver
boost::asio::ip::udp::resolver resolver
Definition: asio.hpp:272
FC_LOG_MESSAGE
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
Definition: log_message.hpp:163
elog
#define elog(FORMAT,...)
Definition: logger.hpp:129