BitShares-Core  7.0.2
BitShares blockchain node software and command-line wallet software
asio.hpp
Go to the documentation of this file.
1 /*
2  * @brief defines wrappers for boost::asio functions
3  */
4 #pragma once
5 #include <boost/asio.hpp>
6 #include <boost/bind.hpp>
7 #include <boost/thread.hpp>
8 #include <vector>
9 #include <fc/thread/future.hpp>
10 #include <fc/io/iostream.hpp>
11 
12 namespace fc {
16 namespace asio {
20  namespace detail {
21 
23  {
24  public:
26  void operator()(const boost::system::error_code& ec, size_t bytes_transferred);
27  private:
28  promise<size_t>::ptr _completion_promise;
29  };
30 
32  {
33  public:
35  const std::shared_ptr<const char>& buffer);
36  void operator()(const boost::system::error_code& ec, size_t bytes_transferred);
37  private:
38  promise<size_t>::ptr _completion_promise;
39  std::shared_ptr<const char> _buffer;
40  };
41 
42  //void read_write_handler( const promise<size_t>::ptr& p,
43  // const boost::system::error_code& ec,
44  // size_t bytes_transferred );
46  boost::system::error_code* oec,
47  const boost::system::error_code& ec,
48  size_t bytes_transferred );
49  void error_handler( const promise<void>::ptr& p,
50  const boost::system::error_code& ec );
52  const boost::system::error_code& ec );
53 
54  template<typename C>
55  struct non_blocking {
56  bool operator()( C& c ) { return c.non_blocking(); }
57  bool operator()( C& c, bool s ) { c.non_blocking(s); return true; }
58  };
59 
60 #if WIN32 // windows stream handles do not support non blocking!
61  template<>
62  struct non_blocking<boost::asio::windows::stream_handle> {
63  typedef boost::asio::windows::stream_handle C;
64  bool operator()( C& ) { return false; }
65  bool operator()( C&, bool ) { return false; }
66  };
67 #endif
68  } // end of namespace detail
69 
70  /***
71  * A structure for holding the boost io service and associated
72  * threads
73  */
75  {
76  public:
79  static void set_num_threads(uint16_t num_threads);
80  static uint16_t get_num_threads();
81  boost::asio::io_service* io;
82  private:
83  std::vector<boost::thread*> asio_threads;
84  boost::asio::io_service::work* the_work;
85  protected:
86  static uint16_t num_io_threads; // marked protected to help with testing
87  };
88 
95  boost::asio::io_service& default_io_service();
96 
102  template<typename AsyncReadStream, typename MutableBufferSequence>
103  size_t read( AsyncReadStream& s, const MutableBufferSequence& buf ) {
104  promise<size_t>::ptr p = promise<size_t>::create("fc::asio::read");
105  boost::asio::async_read( s, buf, detail::read_write_handler(p) );
106  return p->wait();
107  }
121  template<typename AsyncReadStream, typename MutableBufferSequence>
122  future<size_t> read_some(AsyncReadStream& s, const MutableBufferSequence& buf)
123  {
124  promise<size_t>::ptr completion_promise = promise<size_t>::create("fc::asio::async_read_some");
125  s.async_read_some(buf, detail::read_write_handler(completion_promise));
126  return completion_promise;//->wait();
127  }
128 
129  template<typename AsyncReadStream>
130  future<size_t> read_some(AsyncReadStream& s, char* buffer, size_t length, size_t offset = 0)
131  {
132  promise<size_t>::ptr completion_promise = promise<size_t>::create("fc::asio::async_read_some");
133  s.async_read_some(boost::asio::buffer(buffer + offset, length),
134  detail::read_write_handler(completion_promise));
135  return completion_promise;//->wait();
136  }
137 
138  template<typename AsyncReadStream>
139  future<size_t> read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer, size_t length, size_t offset)
140  {
141  promise<size_t>::ptr completion_promise = promise<size_t>::create("fc::asio::async_read_some");
142  s.async_read_some(boost::asio::buffer(buffer.get() + offset, length),
143  detail::read_write_handler_with_buffer(completion_promise, buffer));
144  return completion_promise;//->wait();
145  }
146 
147  template<typename AsyncReadStream, typename MutableBufferSequence>
148  void async_read_some(AsyncReadStream& s, const MutableBufferSequence& buf, promise<size_t>::ptr completion_promise)
149  {
150  s.async_read_some(buf, detail::read_write_handler(completion_promise));
151  }
152 
153  template<typename AsyncReadStream>
154  void async_read_some(AsyncReadStream& s, char* buffer,
155  size_t length, promise<size_t>::ptr completion_promise)
156  {
157  s.async_read_some(boost::asio::buffer(buffer, length), detail::read_write_handler(completion_promise));
158  }
159 
160  template<typename AsyncReadStream>
161  void async_read_some(AsyncReadStream& s, const std::shared_ptr<char>& buffer,
162  size_t length, size_t offset, promise<size_t>::ptr completion_promise)
163  {
164  s.async_read_some(boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(completion_promise, buffer));
165  }
166 
167  template<typename AsyncReadStream>
168  size_t read_some( AsyncReadStream& s, boost::asio::streambuf& buf )
169  {
170  char buffer[1024];
171  size_t bytes_read = read_some( s, boost::asio::buffer( buffer, sizeof(buffer) ) );
172  buf.sputn( buffer, bytes_read );
173  return bytes_read;
174  }
175 
179  template<typename AsyncWriteStream, typename ConstBufferSequence>
180  size_t write( AsyncWriteStream& s, const ConstBufferSequence& buf ) {
181  promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write");
182  boost::asio::async_write(s, buf, detail::read_write_handler(p));
183  return p->wait();
184  }
185 
191  template<typename AsyncWriteStream, typename ConstBufferSequence>
192  future<size_t> write_some( AsyncWriteStream& s, const ConstBufferSequence& buf ) {
193  promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write_some");
194  s.async_write_some( buf, detail::read_write_handler(p));
195  return p; //->wait();
196  }
197 
198  template<typename AsyncWriteStream>
199  future<size_t> write_some( AsyncWriteStream& s, const char* buffer,
200  size_t length, size_t offset = 0) {
201  promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write_some");
202  s.async_write_some( boost::asio::buffer(buffer + offset, length), detail::read_write_handler(p));
203  return p; //->wait();
204  }
205 
206  template<typename AsyncWriteStream>
207  future<size_t> write_some( AsyncWriteStream& s, const std::shared_ptr<const char>& buffer,
208  size_t length, size_t offset ) {
209  promise<size_t>::ptr p = promise<size_t>::create("fc::asio::write_some");
210  s.async_write_some( boost::asio::buffer(buffer.get() + offset, length), detail::read_write_handler_with_buffer(p, buffer));
211  return p; //->wait();
212  }
213 
219  template<typename AsyncWriteStream, typename ConstBufferSequence>
220  void async_write_some(AsyncWriteStream& s, const ConstBufferSequence& buf, promise<size_t>::ptr completion_promise) {
221  s.async_write_some(buf, detail::read_write_handler(completion_promise));
222  }
223 
224  template<typename AsyncWriteStream>
225  void async_write_some(AsyncWriteStream& s, const char* buffer,
226  size_t length, promise<size_t>::ptr completion_promise) {
227  s.async_write_some(boost::asio::buffer(buffer, length),
228  detail::read_write_handler(completion_promise));
229  }
230 
231  template<typename AsyncWriteStream>
232  void async_write_some(AsyncWriteStream& s, const std::shared_ptr<const char>& buffer,
233  size_t length, size_t offset, promise<size_t>::ptr completion_promise) {
234  s.async_write_some(boost::asio::buffer(buffer.get() + offset, length),
235  detail::read_write_handler_with_buffer(completion_promise, buffer));
236  }
237 
238  namespace tcp {
240  typedef boost::asio::ip::tcp::resolver::iterator resolver_iterator;
242  std::vector<endpoint> resolve( const std::string& hostname, const std::string& port );
243 
249  template<typename SocketType, typename AcceptorType>
250  void accept( AcceptorType& acc, SocketType& sock ) {
251  promise<void>::ptr p = promise<void>::create("fc::asio::tcp::accept");
252  acc.async_accept( sock, boost::bind( fc::asio::detail::error_handler, p, _1 ) );
253  p->wait();
254  //if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) );
255  }
256 
261  template<typename AsyncSocket, typename EndpointType>
262  void connect( AsyncSocket& sock, const EndpointType& ep ) {
263  promise<void>::ptr p = promise<void>::create("fc::asio::tcp::connect");
264  sock.async_connect( ep, boost::bind( fc::asio::detail::error_handler, p, _1 ) );
265  p->wait();
266  //if( ec ) BOOST_THROW_EXCEPTION( boost::system::system_error(ec) );
267  }
268  }
269  namespace udp {
271  typedef boost::asio::ip::udp::resolver::iterator resolver_iterator;
274  std::vector<endpoint> resolve( resolver& r, const std::string& hostname,
275  const std::string& port );
276  }
277 
278  template<typename AsyncReadStream>
279  class istream : public virtual fc::istream
280  {
281  public:
282  istream( std::shared_ptr<AsyncReadStream> str )
283  :_stream( std::move(str) ){}
284 
285  virtual size_t readsome( char* buf, size_t len )
286  {
287  return fc::asio::read_some(*_stream, buf, len).wait();
288  }
289  virtual size_t readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset )
290  {
291  return fc::asio::read_some(*_stream, buf, len, offset).wait();
292  }
293 
294  private:
295  std::shared_ptr<AsyncReadStream> _stream;
296  };
297 
298  template<typename AsyncWriteStream>
299  class ostream : public virtual fc::ostream
300  {
301  public:
302  ostream( std::shared_ptr<AsyncWriteStream> str )
303  :_stream( std::move(str) ){}
304 
305  virtual size_t writesome( const char* buf, size_t len )
306  {
307  return fc::asio::write_some(*_stream, buf, len).wait();
308  }
309 
310  virtual size_t writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset )
311  {
312  return fc::asio::write_some(*_stream, buf, len, offset).wait();
313  }
314 
315  virtual void close(){ _stream->close(); }
316  virtual void flush() {}
317  private:
318  std::shared_ptr<AsyncWriteStream> _stream;
319  };
320 
321 
322 } } // namespace fc::asio
323 
fc::asio::detail::read_write_handler
Definition: asio.hpp:22
fc::promise
Definition: future.hpp:109
fc::asio::detail::read_write_handler_with_buffer::read_write_handler_with_buffer
read_write_handler_with_buffer(const promise< size_t >::ptr &p, const std::shared_ptr< const char > &buffer)
Definition: asio.cpp:43
fc::future< size_t >
fc::asio::detail::read_write_handler_with_buffer::operator()
void operator()(const boost::system::error_code &ec, size_t bytes_transferred)
Definition: asio.cpp:48
fc::promise::ptr
std::shared_ptr< promise< T > > ptr
Definition: future.hpp:111
fc::asio::ostream::writesome
virtual size_t writesome(const char *buf, size_t len)
Definition: asio.hpp:305
fc::promise::wait
const T & wait(const microseconds &timeout=microseconds::maximum())
Definition: future.hpp:127
fc::asio::detail::non_blocking::operator()
bool operator()(C &c)
Definition: asio.hpp:56
fc::asio::istream
Definition: asio.hpp:279
fc::asio::ostream::flush
virtual void flush()
Definition: asio.hpp:316
fc::asio::read_some
future< size_t > read_some(AsyncReadStream &s, const MutableBufferSequence &buf)
Definition: asio.hpp:122
fc
Definition: api.hpp:15
fc::asio::detail::non_blocking::operator()
bool operator()(C &c, bool s)
Definition: asio.hpp:57
fc::asio::detail::error_handler_ec
void error_handler_ec(promise< boost::system::error_code > *p, const boost::system::error_code &ec)
Definition: asio.cpp:104
fc::asio::tcp::resolver
boost::asio::ip::tcp::resolver resolver
Definition: asio.hpp:241
fc::asio::default_io_service_scope
Definition: asio.hpp:74
fc::asio::write
size_t write(AsyncWriteStream &s, const ConstBufferSequence &buf)
wraps boost::asio::async_write
Definition: asio.hpp:180
fc::promise::create
static ptr create(const char *desc FC_TASK_NAME_DEFAULT_ARG)
Definition: future.hpp:114
fc::asio::tcp::endpoint
boost::asio::ip::tcp::endpoint endpoint
Definition: asio.hpp:239
fc::asio::default_io_service_scope::~default_io_service_scope
~default_io_service_scope()
Definition: asio.cpp:200
fc::asio::write_some
future< size_t > write_some(AsyncWriteStream &s, const ConstBufferSequence &buf)
wraps boost::asio::async_write_some
Definition: asio.hpp:192
fc::asio::read
size_t read(AsyncReadStream &s, const MutableBufferSequence &buf)
wraps boost::asio::async_read
Definition: asio.hpp:103
fc::asio::ostream::ostream
ostream(std::shared_ptr< AsyncWriteStream > str)
Definition: asio.hpp:302
iostream.hpp
fc::asio::default_io_service_scope::get_num_threads
static uint16_t get_num_threads()
Definition: asio.cpp:145
fc::asio::async_read_some
void async_read_some(AsyncReadStream &s, const MutableBufferSequence &buf, promise< size_t >::ptr completion_promise)
Definition: asio.hpp:148
fc::asio::tcp::connect
void connect(AsyncSocket &sock, const EndpointType &ep)
wraps boost::asio::socket::async_connect
Definition: asio.hpp:262
fc::asio::udp::endpoint
boost::asio::ip::udp::endpoint endpoint
Definition: asio.hpp:270
fc::asio::default_io_service_scope::default_io_service_scope
default_io_service_scope()
Definition: asio.cpp:150
fc::ostream
Definition: iostream.hpp:41
fc::istream
Definition: iostream.hpp:11
fc::asio::detail::read_write_handler_ec
void read_write_handler_ec(promise< size_t > *p, boost::system::error_code *oec, const boost::system::error_code &ec, size_t bytes_transferred)
Definition: asio.cpp:72
fc::asio::detail::non_blocking
Definition: asio.hpp:55
fc::asio::tcp::accept
void accept(AcceptorType &acc, SocketType &sock)
wraps boost::asio::async_accept
Definition: asio.hpp:250
fc::asio::detail::read_write_handler_with_buffer
Definition: asio.hpp:31
fc::asio::ostream
Definition: asio.hpp:299
future.hpp
fc::future::wait
const T & wait(const microseconds &timeout=microseconds::maximum()) const
Definition: future.hpp:228
fc::asio::istream::readsome
virtual size_t readsome(const std::shared_ptr< char > &buf, size_t len, size_t offset)
Definition: asio.hpp:289
fc::asio::detail::read_write_handler::read_write_handler
read_write_handler(const promise< size_t >::ptr &p)
Definition: asio.cpp:14
fc::asio::default_io_service_scope::num_io_threads
static uint16_t num_io_threads
Definition: asio.hpp:89
std
Definition: zeroed_array.hpp:76
fc::asio::detail::error_handler
void error_handler(const promise< void >::ptr &p, const boost::system::error_code &ec)
Definition: asio.cpp:77
fc::asio::tcp::resolver_iterator
boost::asio::ip::tcp::resolver::iterator resolver_iterator
Definition: asio.hpp:240
fc::asio::async_write_some
void async_write_some(AsyncWriteStream &s, const ConstBufferSequence &buf, promise< size_t >::ptr completion_promise)
wraps boost::asio::async_write_some
Definition: asio.hpp:220
fc::asio::detail::read_write_handler::operator()
void operator()(const boost::system::error_code &ec, size_t bytes_transferred)
Definition: asio.cpp:19
fc::asio::default_io_service
boost::asio::io_service & default_io_service()
Definition: asio.cpp:219
fc::asio::ostream::close
virtual void close()
Definition: asio.hpp:315
fc::asio::istream::readsome
virtual size_t readsome(char *buf, size_t len)
Definition: asio.hpp:285
fc::asio::ostream::writesome
virtual size_t writesome(const std::shared_ptr< const char > &buf, size_t len, size_t offset)
Definition: asio.hpp:310
fc::asio::udp::resolve
std::vector< endpoint > resolve(resolver &r, const std::string &hostname, const std::string &port)
resolve all udp::endpoints for hostname:port
Definition: asio.cpp:239
fc::asio::default_io_service_scope::set_num_threads
static void set_num_threads(uint16_t num_threads)
Definition: asio.cpp:140
fc::asio::default_io_service_scope::io
boost::asio::io_service * io
Definition: asio.hpp:84
fc::asio::istream::istream
istream(std::shared_ptr< AsyncReadStream > str)
Definition: asio.hpp:282
fc::asio::tcp::resolve
std::vector< endpoint > resolve(const std::string &hostname, const std::string &port)
Definition: asio.cpp:225
fc::asio::udp::resolver
boost::asio::ip::udp::resolver resolver
Definition: asio.hpp:272
fc::asio::udp::resolver_iterator
boost::asio::ip::udp::resolver::iterator resolver_iterator
Definition: asio.hpp:271