38 template<u
int8_t SpaceID, u
int8_t TypeID>
71 std::string elasticsearch_url =
"http://localhost:9200/";
72 std::string auth =
"";
73 uint32_t bulk_replay = 10000;
74 uint32_t bulk_sync = 100;
77 object_options accounts {
true,
false,
true,
"account" };
78 object_options assets {
true,
false,
true,
"asset" };
79 object_options balances {
true,
false,
true,
"balance" };
80 object_options limit_orders {
true,
false,
false,
"limitorder" };
81 object_options asset_bitasset {
true,
false,
true,
"bitasset" };
82 object_options budget {
true,
false,
true,
"budget" };
84 std::string index_prefix =
"objects-";
88 uint16_t max_mapping_depth = 10;
90 uint32_t start_es_after_block = 0;
91 bool sync_db_on_startup =
false;
93 void init(
const boost::program_options::variables_map& options);
96 enum class action_type
103 void on_objects_create(
const vector<object_id_type>& ids)
104 { index_database( ids, action_type::insertion ); }
106 void on_objects_update(
const vector<object_id_type>& ids)
107 { index_database( ids, action_type::update ); }
109 void on_objects_delete(
const vector<object_id_type>& ids)
110 { index_database( ids, action_type::deletion ); }
112 void index_database(
const vector<object_id_type>& ids, action_type action);
114 void sync_db(
bool delete_before_load =
false );
116 void delete_from_database(
const object_id_type&
id,
const plugin_options::object_options& opt );
118 void delete_all_from_database(
const plugin_options::object_options& opt )
const;
120 es_objects_plugin& _self;
121 plugin_options _options;
123 uint32_t limit_documents = _options.bulk_replay;
125 uint64_t docs_sent_batch = 0;
126 uint64_t docs_sent_total = 0;
128 std::unique_ptr<graphene::utilities::es_client> es;
130 vector<std::string> bulk_lines;
131 size_t approximate_bulk_size = 0;
133 uint32_t block_number = 0;
135 bool is_es_version_7_or_above =
true;
138 void prepareTemplate(
const T& blockchain_object,
const plugin_options::object_options& opt );
140 void init_program_options(
const boost::program_options::variables_map& options);
142 void send_bulk_if_ready(
bool force =
false );
155 template<
typename ObjType>
157 bool force_delete =
false )
165 ilog(
"Deleting all data in index " +
my->_options.index_prefix + opt.
index_name );
166 my->delete_all_from_database( opt );
169 ilog(
"Loading data into index " +
my->_options.index_prefix + opt.
index_name );
172 my->prepareTemplate(
static_cast<const ObjType&
>(o), opt );
174 my->send_bulk_if_ready(
true);
175 my->docs_sent_batch = 0;
179 void es_objects_plugin_impl::sync_db(
bool delete_before_load )
181 ilog(
"elasticsearch OBJECTS: loading data from the object database (chain state)");
190 loader.load<
account_object >( _options.accounts, delete_before_load );
191 loader.load<
asset_object >( _options.assets, delete_before_load );
194 loader.load<
proposal_object >( _options.proposals, delete_before_load );
198 ilog(
"elasticsearch OBJECTS: done loading data from the object database (chain state)");
201 void es_objects_plugin_impl::index_database(
const vector<object_id_type>& ids, action_type action)
207 if( block_number <= _options.start_es_after_block )
214 limit_documents = _options.bulk_sync;
216 limit_documents = _options.bulk_replay;
218 bulk_lines.reserve(limit_documents);
220 static const unordered_map<uint16_t,plugin_options::object_options&> data_type_map = {
221 { account_id_type::space_type, _options.accounts },
222 { account_balance_id_type::space_type, _options.balances },
223 { asset_id_type::space_type, _options.assets },
224 { asset_bitasset_data_id_type::space_type, _options.asset_bitasset },
225 { limit_order_id_type::space_type, _options.limit_orders },
226 { proposal_id_type::space_type, _options.proposals },
227 { budget_record_id_type::space_type, _options.budget }
230 for(
const auto& value: ids )
232 const auto itr = data_type_map.find( value.space_type() );
233 if( itr == data_type_map.end() || !(itr->second.enabled) )
235 const auto& opt = itr->second;
236 if( action_type::deletion == action )
237 delete_from_database( value, opt );
242 case account_id_type::space_type:
243 prepareTemplate( db.
get<account_object>(value), opt );
245 case account_balance_id_type::space_type:
246 prepareTemplate( db.
get<account_balance_object>(value), opt );
248 case asset_id_type::space_type:
249 prepareTemplate( db.
get<asset_object>(value), opt );
251 case asset_bitasset_data_id_type::space_type:
252 prepareTemplate( db.
get<asset_bitasset_data_object>(value), opt );
254 case limit_order_id_type::space_type:
255 prepareTemplate( db.
get<limit_order_object>(value), opt );
257 case proposal_id_type::space_type:
258 prepareTemplate( db.
get<proposal_object>(value), opt );
260 case budget_record_id_type::space_type:
261 prepareTemplate( db.
get<budget_record_object>(value), opt );
271 void es_objects_plugin_impl::delete_from_database(
272 const object_id_type&
id,
const es_objects_plugin_impl::plugin_options::object_options& opt )
278 delete_line[
"_id"] = string(
id);
279 delete_line[
"_index"] = _options.index_prefix + opt.index_name;
280 if( !is_es_version_7_or_above )
281 delete_line[
"_type"] =
"_doc";
283 final_delete_line[
"delete"] = std::move( delete_line );
287 approximate_bulk_size += bulk_lines.back().size();
289 send_bulk_if_ready();
292 void es_objects_plugin_impl::delete_all_from_database(
const plugin_options::object_options& opt )
const
300 es->query( _options.index_prefix + opt.index_name +
"/_delete_by_query", R
"({"query":{"match_all":{}}})" );
304 void es_objects_plugin_impl::prepareTemplate(
305 const T& blockchain_object,
const es_objects_plugin_impl::plugin_options::object_options& opt )
308 bulk_header[
"_index"] = _options.index_prefix + opt.index_name;
309 if( !is_es_version_7_or_above )
310 bulk_header[
"_type"] =
"_doc";
311 if( !opt.store_updates )
313 bulk_header[
"_id"] = string(blockchain_object.id);
319 _options.max_mapping_depth ) );
321 o[
"object_id"] = string(blockchain_object.id);
322 o[
"block_time"] = block_time;
323 o[
"block_number"] = block_number;
328 std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk_lines));
330 approximate_bulk_size += bulk_lines.back().size();
332 send_bulk_if_ready();
335 void es_objects_plugin_impl::send_bulk_if_ready(
bool force )
337 if( bulk_lines.empty() )
339 if( !force && bulk_lines.size() < limit_documents
342 constexpr uint32_t log_count_threshold = 20000;
343 constexpr uint32_t log_time_threshold = 3600;
344 static uint64_t next_log_count = log_count_threshold;
346 docs_sent_batch += bulk_lines.size();
347 docs_sent_total += bulk_lines.size();
348 bool log_by_next = ( docs_sent_total >= next_log_count ||
fc::time_point::now() >= next_log_time );
349 if( log_by_next || limit_documents == _options.bulk_replay || force )
351 ilog(
"Sending ${n} lines of bulk data to ElasticSearch at block ${blk}, "
352 "this batch ${b}, total ${t}, approximate size ${s}",
353 (
"n",bulk_lines.size())(
"blk",block_number)
354 (
"b",docs_sent_batch)(
"t",docs_sent_total)(
"s",approximate_bulk_size) );
355 next_log_count = docs_sent_total + log_count_threshold;
359 if( !es->send_bulk( bulk_lines ) )
361 elog(
"Error sending ${n} lines of bulk data to ElasticSearch, the first lines are:",
362 (
"n",bulk_lines.size()) );
363 const auto log_max = std::min( bulk_lines.size(),
size_t(10) );
364 for(
size_t i = 0; i < log_max; ++i )
366 edump( (bulk_lines[i]) );
369 "Error populating ES database, we are going to keep trying." );
372 bulk_lines.reserve(limit_documents);
373 approximate_bulk_size = 0;
380 my(
std::make_unique<detail::es_objects_plugin_impl>(*this) )
393 return "Stores blockchain objects in ES database. Experimental.";
397 boost::program_options::options_description& cli,
398 boost::program_options::options_description& cfg
402 (
"es-objects-elasticsearch-url", boost::program_options::value<std::string>(),
403 "Elasticsearch node url(http://localhost:9200/)")
404 (
"es-objects-auth", boost::program_options::value<std::string>(),
"Basic auth username:password('')")
405 (
"es-objects-bulk-replay", boost::program_options::value<uint32_t>(),
406 "Number of bulk documents to index on replay(10000)")
407 (
"es-objects-bulk-sync", boost::program_options::value<uint32_t>(),
408 "Number of bulk documents to index on a synchronized chain(100)")
410 (
"es-objects-proposals", boost::program_options::value<bool>(),
"Store proposal objects (true)")
411 (
"es-objects-proposals-store-updates", boost::program_options::value<bool>(),
412 "Store all updates to the proposal objects (false)")
413 (
"es-objects-proposals-no-delete", boost::program_options::value<bool>(),
414 "Do not delete a proposal from ES even if it is deleted from chain state. "
415 "It is implicitly true and can not be set to false if es-objects-proposals-store-updates is true. "
418 (
"es-objects-accounts", boost::program_options::value<bool>(),
"Store account objects (true)")
419 (
"es-objects-accounts-store-updates", boost::program_options::value<bool>(),
420 "Store all updates to the account objects (false)")
422 (
"es-objects-assets", boost::program_options::value<bool>(),
"Store asset objects (true)")
423 (
"es-objects-assets-store-updates", boost::program_options::value<bool>(),
424 "Store all updates to the asset objects (false)")
426 (
"es-objects-balances", boost::program_options::value<bool>(),
"Store account balances (true)")
427 (
"es-objects-balances-store-updates", boost::program_options::value<bool>(),
428 "Store all updates to the account balances (false)")
430 (
"es-objects-limit-orders", boost::program_options::value<bool>(),
"Store limit order objects (true)")
431 (
"es-objects-limit-orders-store-updates", boost::program_options::value<bool>(),
432 "Store all updates to the limit orders (false)")
433 (
"es-objects-limit-orders-no-delete", boost::program_options::value<bool>(),
434 "Do not delete a limit order object from ES even if it is deleted from chain state. "
435 "It is implicitly true and can not be set to false if es-objects-limit-orders-store-updates is true. "
438 (
"es-objects-asset-bitasset", boost::program_options::value<bool>(),
439 "Store bitasset data, including price feeds (true)")
440 (
"es-objects-asset-bitasset-store-updates", boost::program_options::value<bool>(),
441 "Store all updates to the bitasset data (false)")
443 (
"es-objects-budget-records", boost::program_options::value<bool>(),
"Store budget records (true)")
445 (
"es-objects-index-prefix", boost::program_options::value<std::string>(),
446 "Add a prefix to the index(objects-)")
447 (
"es-objects-max-mapping-depth", boost::program_options::value<uint16_t>(),
448 "Can not exceed the maximum index mapping depth (index.mapping.depth.limit) setting in ES, "
449 "and need to be even smaller to not trigger the index.mapping.total_fields.limit error (10)")
450 (
"es-objects-keep-only-current", boost::program_options::value<bool>(),
451 "Deprecated. Please use the store-updates or no-delete options. "
452 "Keep only current state of the objects(true)")
453 (
"es-objects-start-es-after-block", boost::program_options::value<uint32_t>(),
454 "Start doing ES job after block(0)")
455 (
"es-objects-sync-db-on-startup", boost::program_options::value<bool>(),
456 "Copy all applicable objects from the object database (chain state) to ES on program startup (false)")
461 void detail::es_objects_plugin_impl::init_program_options(
const boost::program_options::variables_map& options)
463 _options.init( options );
465 es = std::make_unique<graphene::utilities::es_client>( _options.elasticsearch_url, _options.auth );
467 FC_ASSERT( es->check_status(),
"ES database is not up in url ${url}", (
"url", _options.elasticsearch_url) );
469 es->check_version_7_or_above( is_es_version_7_or_above );
472 void detail::es_objects_plugin_impl::plugin_options::init(
const boost::program_options::variables_map& options)
501 my->init_program_options( options );
504 const flat_set<account_id_type>& ) {
505 my->on_objects_create( ids );
508 const flat_set<account_id_type>& ) {
509 my->on_objects_update( ids );
512 const vector<const object*>&,
const flat_set<account_id_type>& ) {
513 my->on_objects_delete( ids );
520 if( 0 ==
database().head_block_num() )
522 else if( my->_options.sync_db_on_startup )
528 my->send_bulk_if_ready(
true);