BitShares-Core  7.0.2
BitShares blockchain node software and command-line wallet software
elasticsearch.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 oxarbitrage, and contributors.
3  *
4  * The MIT License
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  */
25 
26 #include <boost/algorithm/string/join.hpp>
27 
28 #include <fc/io/json.hpp>
30 
31 static size_t curl_write_function(void *contents, size_t size, size_t nmemb, void *userp)
32 {
33  ((std::string*)userp)->append((char*)contents, size * nmemb);
34  return size * nmemb;
35 }
36 
37 namespace graphene { namespace utilities {
38 
39 static bool handle_bulk_response( uint16_t http_code, const std::string& curl_read_buffer )
40 {
42  {
43  // all good, but check errors in response
44  fc::variant j = fc::json::from_string(curl_read_buffer);
45  bool errors = j["errors"].as_bool();
46  if( errors )
47  {
48  elog( "ES returned 200 but with errors: ${e}", ("e", curl_read_buffer) );
49  return false;
50  }
51  return true;
52  }
53 
55  {
56  elog( "413 error: Request too large. Can be low disk space. ${e}", ("e", curl_read_buffer) );
57  }
58  else if( curl_wrapper::http_response_code::HTTP_401 == http_code )
59  {
60  elog( "401 error: Unauthorized. ${e}", ("e", curl_read_buffer) );
61  }
62  else
63  {
64  elog( "${code} error: ${e}", ("code", std::to_string(http_code)) ("e", curl_read_buffer) );
65  }
66  return false;
67 }
68 
69 std::vector<std::string> createBulk(const fc::mutable_variant_object& bulk_header, std::string&& data)
70 {
71  std::vector<std::string> bulk;
72  fc::mutable_variant_object final_bulk_header;
73  final_bulk_header["index"] = bulk_header;
74  bulk.push_back(fc::json::to_string(final_bulk_header));
75  bulk.emplace_back(std::move(data));
76 
77  return bulk;
78 }
79 
81 {
82  return ( http_response_code::HTTP_200 == code );
83 }
84 
85 CURL* curl_wrapper::init_curl()
86 {
87  CURL* curl = curl_easy_init();
88  if( curl )
89  {
90  curl_easy_setopt( curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2 );
91  return curl;
92  }
93  FC_THROW( "Unable to init cURL" );
94 }
95 
96 curl_slist* curl_wrapper::init_request_headers()
97 {
98  curl_slist* request_headers = curl_slist_append( NULL, "Content-Type: application/json" );
99  FC_ASSERT( request_headers, "Unable to init cURL request headers" );
100  return request_headers;
101 }
102 
104 {
105  curl_easy_setopt( curl.get(), CURLOPT_HTTPHEADER, request_headers.get() );
106  curl_easy_setopt( curl.get(), CURLOPT_USERAGENT, "bitshares-core/6.1" );
107 }
108 
109 void curl_wrapper::curl_deleter::operator()( CURL* p_curl ) const
110 {
111  if( p_curl )
112  curl_easy_cleanup( p_curl );
113 }
114 
115 void curl_wrapper::curl_slist_deleter::operator()( curl_slist* slist ) const
116 {
117  if( slist )
118  curl_slist_free_all( slist );
119 }
120 
122  const std::string& url,
123  const std::string& auth,
124  const std::string& query ) const
125 {
127 
128  // Note: the variable curl has a long lifetime, it only gets initialized once, then be used many times,
129  // thus we need to clear old data
130 
131  // Note: host and auth are always the same in the program, ideally we don't need to set them every time
132  curl_easy_setopt( curl.get(), CURLOPT_URL, url.c_str() );
133  if( !auth.empty() )
134  curl_easy_setopt( curl.get(), CURLOPT_USERPWD, auth.c_str() );
135 
136  // Empty for GET, POST or HEAD, non-empty for DELETE or PUT
137  static const std::vector<std::string> http_request_method_custom_str = {
138  "", // GET
139  "", // POST
140  "", // HEAD
141  "PUT",
142  "DELETE",
143  "PATCH",
144  "OPTIONS"
145  };
146  const auto& custom_request = http_request_method_custom_str[static_cast<size_t>(method)];
147  const auto* p_custom_request = custom_request.empty() ? NULL : custom_request.c_str();
148  curl_easy_setopt( curl.get(), CURLOPT_CUSTOMREQUEST, p_custom_request );
149 
152  {
153  curl_easy_setopt( curl.get(), CURLOPT_HTTPGET, false );
154  curl_easy_setopt( curl.get(), CURLOPT_POST, true );
155  curl_easy_setopt( curl.get(), CURLOPT_POSTFIELDS, query.c_str() );
156  }
157  else // GET or DELETE (only these are used in this file)
158  {
159  curl_easy_setopt( curl.get(), CURLOPT_POSTFIELDS, NULL );
160  curl_easy_setopt( curl.get(), CURLOPT_POST, false );
161  curl_easy_setopt( curl.get(), CURLOPT_HTTPGET, true );
162  }
163 
164  curl_easy_setopt( curl.get(), CURLOPT_WRITEFUNCTION, curl_write_function );
165  curl_easy_setopt( curl.get(), CURLOPT_WRITEDATA, (void *)(&resp.content) );
166  curl_easy_perform( curl.get() );
167 
168  long code;
169  curl_easy_getinfo( curl.get(), CURLINFO_RESPONSE_CODE, &code );
170  resp.code = static_cast<uint16_t>( code );
171 
172  return resp;
173 }
174 
175 curl_wrapper::http_response curl_wrapper::get( const std::string& url, const std::string& auth ) const
176 {
177  return request( http_request_method::HTTP_GET, url, auth, "" );
178 }
179 
180 curl_wrapper::http_response curl_wrapper::del( const std::string& url, const std::string& auth ) const
181 {
182  return request( http_request_method::HTTP_DELETE, url, auth, "" );
183 }
184 
185 curl_wrapper::http_response curl_wrapper::post( const std::string& url, const std::string& auth,
186  const std::string& query ) const
187 {
188  return request( http_request_method::HTTP_POST, url, auth, query );
189 }
190 
191 curl_wrapper::http_response curl_wrapper::put( const std::string& url, const std::string& auth,
192  const std::string& query ) const
193 {
194  return request( http_request_method::HTTP_PUT, url, auth, query );
195 }
196 
198 {
199  const auto response = curl.get( base_url + "_nodes", auth );
200 
201  // Note: response.code is ignored here
202  return !response.content.empty();
203 }
204 
205 std::string es_client::get_version() const
206 { try {
207  const auto response = curl.get( base_url, auth );
208  if( !response.is_200() )
209  FC_THROW( "Error on es_client::get_version(): code = ${code}, message = ${message} ",
210  ("code", response.code) ("message", response.content) );
211 
212  fc::variant content = fc::json::from_string( response.content );
213  return content["version"]["number"].as_string();
214 } FC_CAPTURE_LOG_AND_RETHROW( (base_url) ) } // GCOVR_EXCL_LINE
215 
216 void es_client::check_version_7_or_above( bool& result ) const noexcept
217 {
218  static const int64_t version_7 = 7;
219  try {
220  const auto es_version = get_version();
221  ilog( "ES version detected: ${v}", ("v", es_version) );
222  auto dot_pos = es_version.find('.');
223  result = ( std::stoi(es_version.substr(0,dot_pos)) >= version_7 );
224  }
225  catch( ... )
226  {
227  wlog( "Unable to get ES version, assuming it is 7 or above" );
228  result = true;
229  }
230 }
231 
232 bool es_client::send_bulk( const std::vector<std::string>& bulk_lines ) const
233 {
234  auto bulk_str = boost::algorithm::join( bulk_lines, "\n" ) + "\n";
235  const auto response = curl.post( base_url + "_bulk", auth, bulk_str );
236 
237  return handle_bulk_response( response.code, response.content );
238 }
239 
240 bool es_client::del( const std::string& path ) const
241 {
242  const auto response = curl.del( base_url + path, auth );
243 
244  // Note: response.code is ignored here
245  return !response.content.empty();
246 }
247 
248 std::string es_client::get( const std::string& path ) const
249 {
250  const auto response = curl.get( base_url + path, auth );
251 
252  // Note: response.code is ignored here
253  return response.content;
254 }
255 
256 std::string es_client::query( const std::string& path, const std::string& query ) const
257 {
258  const auto response = curl.post( base_url + path, auth, query );
259 
260  // Note: response.code is ignored here
261  return response.content;
262 }
263 
264 fc::variant es_data_adaptor::adapt( const fc::variant_object& op, uint16_t max_depth )
265 {
266  if( 0 == max_depth )
267  {
268  fc::variant v;
270  return v;
271  }
272 
274 
275  // Note:
276  // These fields are maps, they are stored redundantly in ES,
277  // one instance is a nested string array using the original field names (for backward compatibility, although
278  // ES queries return results in JSON format a little differently than node APIs),
279  // and a new instance is an object array with "_object" suffix added to the field name.
280  static const std::unordered_set<std::string> to_string_array_fields = { "account_auths", "address_auths",
281  "key_auths" };
282 
283  // Note:
284  // These fields are stored redundantly in ES,
285  // one instance is a string using the original field names (originally for backward compatibility,
286  // but new fields are added here as well),
287  // and a new instance is a nested object or nested object array with "_object" suffix added to the field name.
288  //
289  // Why do we add new fields here?
290  // Because we want to keep the JSON format made by node (stored in ES as a string), and store the object format
291  // at the same time for more flexible query.
292  //
293  // Object arrays not listed in this map (if any) are stored as nested objects only.
294  static const std::unordered_map<std::string, data_type> to_string_fields = {
295  { "parameters", data_type::array_type }, // in committee proposals, current_fees.parameters
296  { "op", data_type::static_variant_type }, // proposal_create_op.proposed_ops[*].op
297  { "proposed_ops", data_type::array_type }, // proposal_create_op.proposed_ops
298  { "operations", data_type::array_type }, // proposal_object.operations
299  { "initializer", data_type::static_variant_type }, // for workers
300  { "policy", data_type::static_variant_type }, // for vesting balances
301  { "predicates", data_type::array_type }, // for assert_operation
302  { "active_special_authority", data_type::static_variant_type }, // for accounts
303  { "owner_special_authority", data_type::static_variant_type }, // for accounts
304  { "htlc_preimage_hash", data_type::static_variant_type }, // for HTLCs
305  { "argument", data_type::static_variant_type }, // for custom authority, restriction.argument
306  { "feeds", data_type::map_type }, // asset_bitasset_data_object.feeds
307  { "acceptable_collateral", data_type::map_type }, // for credit offers
308  { "acceptable_borrowers", data_type::map_type }, // for credit offers
309  { "on_fill", data_type::array_type } // for limit orders
310  };
311  std::vector<std::pair<std::string, fc::variants>> original_arrays;
312  std::vector<std::string> keys_to_rename;
313  for( auto& i : o )
314  {
315  const std::string& name = i.key();
316  auto& element = i.value();
317  if( element.is_object() )
318  {
319  const auto& vo = element.get_object();
320  if( vo.contains(name.c_str()) ) // transfer_operation.amount.amount
321  keys_to_rename.emplace_back(name);
322  element = adapt( vo, max_depth - 1 );
323  continue;
324  }
325 
326  if( !element.is_array() )
327  continue;
328 
329  auto& array = element.get_array();
330  if( to_string_fields.find(name) != to_string_fields.end() )
331  {
332  // make a backup (only if depth is sufficient) and convert to string
333  if( max_depth > 1 )
334  original_arrays.emplace_back( name, array );
335  element = fc::json::to_string(element);
336  }
337  else if( to_string_array_fields.find(name) != to_string_array_fields.end() )
338  {
339  // make a backup (only if depth is sufficient) and adapt the original
340  if( max_depth > 1 )
341  {
342  auto backup = array;
343  original_arrays.emplace_back( name, std::move( backup ) );
344  }
345  in_situ_adapt( array, max_depth - 1 );
346  }
347  else
348  in_situ_adapt( array, max_depth - 1 );
349  }
350 
351  for( const auto& i : keys_to_rename ) // transfer_operation.amount
352  {
353  std::string new_name = i + "_";
354  o[new_name] = fc::variant(o[i]);
355  o.erase(i);
356  }
357 
358  if( o.find("nonce") != o.end() )
359  {
360  o["nonce"] = o["nonce"].as_string();
361  }
362 
363  if( o.find("owner") != o.end() && o["owner"].is_string() ) // vesting_balance_*_operation.owner
364  {
365  o["owner_"] = o["owner"].as_string();
366  o.erase("owner");
367  }
368 
369  for( const auto& pair : original_arrays )
370  {
371  const auto& name = pair.first;
372  auto& value = pair.second;
373  auto type = data_type::map_type;
374  if( to_string_fields.find(name) != to_string_fields.end() )
375  type = to_string_fields.at(name);
376  o[name + "_object"] = adapt( value, type, max_depth - 1 );
377  }
378 
379  fc::variant v;
381  return v;
382 }
383 
384 fc::variant es_data_adaptor::adapt( const fc::variants& v, data_type type, uint16_t max_depth )
385 {
386  if( data_type::static_variant_type == type )
387  return adapt_static_variant( v, max_depth );
388 
389  // map_type or array_type
390  fc::variants vs;
391  vs.reserve( v.size() );
392  for( const auto& item : v )
393  {
394  if( item.is_array() )
395  {
396  if( data_type::map_type == type )
397  vs.push_back( adapt_map_item( item.get_array(), max_depth ) );
398  else // assume it is a static_variant array
399  vs.push_back( adapt_static_variant( item.get_array(), max_depth ) );
400  }
401  else if( item.is_object() ) // object array
402  vs.push_back( adapt( item.get_object(), max_depth ) );
403  else
404  wlog( "Type of item is unexpected: ${item}", ("item", item) );
405  }
406 
407  fc::variant nv;
409  return nv;
410 }
411 
413  const fc::variant& v, fc::mutable_variant_object& mv, const std::string& prefix, uint16_t max_depth )
414 {
415  FC_ASSERT( max_depth > 0, "Internal error" );
416  if( v.is_object() )
417  mv[prefix + "_object"] = adapt( v.get_object(), max_depth - 1 );
418  else if( v.is_int64() || v.is_uint64() )
419  mv[prefix + "_int"] = v;
420  else if( v.is_bool() )
421  mv[prefix + "_bool"] = v;
422  else if( v.is_string() )
423  mv[prefix + "_string"] = v.get_string();
424  else
425  mv[prefix + "_string"] = fc::json::to_string( v );
426  // Note: we don't use double here, and we convert nulls and blobs to strings,
427  // arrays and pairs (i.e. in custom authorities) are converted to strings,
428  // static_variants and maps (if any) are converted to strings too.
429 }
430 
432 {
433  if( 0 == max_depth )
434  {
435  fc::variant nv;
437  return nv;
438  }
439 
440  FC_ASSERT( v.size() == 2, "Internal error" );
442 
443  extract_data_from_variant( v[0], mv, "key", max_depth );
444  extract_data_from_variant( v[1], mv, "data", max_depth );
445 
446  fc::variant nv;
448  return nv;
449 }
450 
452 {
453  if( 0 == max_depth )
454  {
455  fc::variant nv;
457  return nv;
458  }
459 
460  FC_ASSERT( v.size() == 2, "Internal error" );
462 
463  mv["which"] = v[0];
464  extract_data_from_variant( v[1], mv, "data", max_depth );
465 
466  fc::variant nv;
468  return nv;
469 }
470 
471 void es_data_adaptor::in_situ_adapt( fc::variants& v, uint16_t max_depth )
472 {
473  for( auto& array_element : v )
474  {
475  if( array_element.is_object() )
476  array_element = adapt( array_element.get_object(), max_depth );
477  else if( array_element.is_array() )
478  in_situ_adapt( array_element.get_array(), max_depth );
479  else
480  array_element = array_element.as_string();
481  }
482 }
483 
484 } } // end namespace graphene::utilities
graphene::utilities::es_data_adaptor::adapt
static fc::variant adapt(const fc::variant_object &op, uint16_t max_depth)
Definition: elasticsearch.cpp:264
FC_PACK_MAX_DEPTH
#define FC_PACK_MAX_DEPTH
Definition: config.hpp:3
fc::variant_object
An order-perserving dictionary of variant's.
Definition: variant_object.hpp:20
fc::variant::is_string
bool is_string() const
Definition: variant.cpp:314
fc::variant::is_uint64
bool is_uint64() const
Definition: variant.cpp:326
wlog
#define wlog(FORMAT,...)
Definition: logger.hpp:123
graphene::utilities::es_client::check_status
bool check_status() const
Definition: elasticsearch.cpp:197
graphene::utilities::curl_wrapper::request
http_response request(http_request_method method, const std::string &url, const std::string &auth, const std::string &query) const
Definition: elasticsearch.cpp:121
fc::mutable_variant_object
An order-perserving dictionary of variant's.
Definition: variant_object.hpp:108
graphene::utilities::createBulk
std::vector< std::string > createBulk(const fc::mutable_variant_object &bulk_header, std::string &&data)
Definition: elasticsearch.cpp:69
graphene::utilities::curl_wrapper::del
http_response del(const std::string &url, const std::string &auth) const
Definition: elasticsearch.cpp:180
graphene::utilities::es_client::del
bool del(const std::string &path) const
Definition: elasticsearch.cpp:240
fc::to_string
std::string to_string(double)
Definition: string.cpp:73
graphene::utilities::es_client::get
std::string get(const std::string &path) const
Definition: elasticsearch.cpp:248
graphene::utilities::es_data_adaptor::data_type
data_type
Definition: elasticsearch.hpp:121
elasticsearch.hpp
FC_THROW
#define FC_THROW( ...)
Definition: exception.hpp:366
graphene::utilities::curl_wrapper::post
http_response post(const std::string &url, const std::string &auth, const std::string &query) const
Definition: elasticsearch.cpp:185
graphene::utilities::es_client::check_version_7_or_above
void check_version_7_or_above(bool &result) const noexcept
Definition: elasticsearch.cpp:216
fc::variant::get_string
const std::string & get_string() const
Definition: variant.cpp:575
graphene::utilities::curl_wrapper::http_response
Definition: elasticsearch.hpp:59
fc::json::from_string
static variant from_string(const string &utf8_str, parse_type ptype=legacy_parser, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
Definition: json.cpp:458
fc::variant::as_string
std::string as_string() const
Definition: variant.cpp:469
fc::variant::is_bool
bool is_bool() const
Definition: variant.cpp:318
graphene::utilities::curl_wrapper::http_response::is_200
bool is_200() const
Definition: elasticsearch.cpp:80
FC_CAPTURE_LOG_AND_RETHROW
#define FC_CAPTURE_LOG_AND_RETHROW(...)
Definition: exception.hpp:415
graphene::utilities::es_data_adaptor::adapt_map_item
static fc::variant adapt_map_item(const fc::variants &v, uint16_t max_depth)
Definition: elasticsearch.cpp:431
fc::json::to_string
static string to_string(const variant &v, output_formatting format=stringify_large_ints_and_doubles, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
Definition: json.cpp:650
graphene::utilities::es_client::query
std::string query(const std::string &path, const std::string &query) const
Definition: elasticsearch.cpp:256
graphene::utilities::curl_wrapper::http_response_code::HTTP_200
static constexpr uint16_t HTTP_200
Definition: elasticsearch.hpp:54
fc::variant::as_bool
bool as_bool() const
Definition: variant.cpp:441
graphene::utilities::curl_wrapper::http_request_method::HTTP_GET
@ HTTP_GET
graphene::utilities::curl_wrapper::http_response_code::HTTP_401
static constexpr uint16_t HTTP_401
Definition: elasticsearch.hpp:55
graphene::utilities::curl_wrapper::http_request_method::HTTP_DELETE
@ HTTP_DELETE
graphene::utilities::curl_wrapper::http_response_code::HTTP_413
static constexpr uint16_t HTTP_413
Definition: elasticsearch.hpp:56
ilog
#define ilog(FORMAT,...)
Definition: logger.hpp:117
fc::variants
std::vector< variant > variants
Definition: variant.hpp:170
graphene::utilities::curl_wrapper::put
http_response put(const std::string &url, const std::string &auth, const std::string &query) const
Definition: elasticsearch.cpp:191
graphene::utilities::curl_wrapper::http_response::content
std::string content
Definition: elasticsearch.hpp:62
fc::variant::is_object
bool is_object() const
Definition: variant.cpp:363
fc::to_variant
void to_variant(const flat_set< T, A... > &var, variant &vo, uint32_t _max_depth)
Definition: flat.hpp:105
graphene::utilities::es_client::get_version
std::string get_version() const
Definition: elasticsearch.cpp:205
graphene::utilities::es_data_adaptor::adapt_static_variant
static fc::variant adapt_static_variant(const fc::variants &v, uint16_t max_depth)
Definition: elasticsearch.cpp:451
graphene::utilities::curl_wrapper::http_request_method
http_request_method
Definition: elasticsearch.hpp:41
fc::variant::is_int64
bool is_int64() const
Definition: variant.cpp:330
json.hpp
graphene::utilities::curl_wrapper::curl_wrapper
curl_wrapper()
Definition: elasticsearch.cpp:103
FC_ASSERT
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
Definition: exception.hpp:345
graphene::utilities::curl_wrapper::get
http_response get(const std::string &url, const std::string &auth) const
Definition: elasticsearch.cpp:175
fc::variant::get_object
variant_object & get_object()
Definition: variant.cpp:554
fc::variant
stores null, int64, uint64, double, bool, string, std::vector<variant>, and variant_object's.
Definition: variant.hpp:198
exception.hpp
Defines exception's used by fc.
graphene::utilities::curl_wrapper::http_request_method::HTTP_POST
@ HTTP_POST
graphene::utilities::es_data_adaptor::extract_data_from_variant
static void extract_data_from_variant(const fc::variant &v, fc::mutable_variant_object &mv, const std::string &prefix, uint16_t max_depth)
Extract data from v into mv.
Definition: elasticsearch.cpp:412
graphene::utilities::curl_wrapper::http_response::code
uint16_t code
Definition: elasticsearch.hpp:61
graphene::utilities::curl_wrapper::http_request_method::HTTP_PUT
@ HTTP_PUT
graphene::utilities::es_client::send_bulk
bool send_bulk(const std::vector< std::string > &bulk_lines) const
Definition: elasticsearch.cpp:232
graphene
Definition: api.cpp:48
graphene::utilities::es_data_adaptor::in_situ_adapt
static void in_situ_adapt(fc::variants &v, uint16_t max_depth)
Update directly, no return.
Definition: elasticsearch.cpp:471
elog
#define elog(FORMAT,...)
Definition: logger.hpp:129