28 #include <graphene/chain/hardfork.hpp>
30 #include <boost/algorithm/string.hpp>
51 std::string elasticsearch_url =
"http://localhost:9200/";
52 std::string auth =
"";
53 uint32_t bulk_replay = 10000;
54 uint32_t bulk_sync = 100;
56 std::string index_prefix =
"bitshares-";
59 uint16_t max_mapping_depth = 20;
61 uint32_t start_es_after_block = 0;
64 bool operation_object =
true;
65 bool operation_string =
false;
69 void init(
const boost::program_options::variables_map& options);
80 plugin_options _options;
84 uint32_t limit_documents = _options.bulk_replay;
86 std::unique_ptr<graphene::utilities::es_client> es;
88 vector <string> bulk_lines;
89 size_t approximate_bulk_size = 0;
93 std::string index_name;
95 bool is_es_version_7_or_above =
true;
98 uint32_t block_number );
99 void send_bulk( uint32_t block_num );
107 void init_program_options(
const boost::program_options::variables_map& options);
111 const std::string& index_prefix )
114 std::vector<std::string> parts;
115 boost::split(parts, block_date_string, boost::is_any_of(
"-"));
116 std::string index_name = index_prefix + parts[0] +
"-" + parts[1];
120 void elasticsearch_plugin_impl::update_account_histories(
const signed_block& b )
122 checkState(b.timestamp);
123 index_name = generateIndexName(b.timestamp, _options.index_prefix);
127 bool is_first =
true;
128 auto skip_oho_id = [&is_first,&db,
this]() {
129 if( is_first && db._undo_db.enabled() )
131 db.remove( db.create<operation_history_object>( []( operation_history_object& obj) {} ) );
135 _oho_index->use_next_id();
137 for(
const optional< operation_history_object >& o_op : hist ) {
138 optional <operation_history_object> oho;
140 auto create_oho = [&]() {
142 return optional<operation_history_object>(
143 db.create<operation_history_object>([&](operation_history_object &h) {
147 h.result = o_op->result;
148 h.block_num = o_op->block_num;
149 h.trx_in_block = o_op->trx_in_block;
150 h.op_in_trx = o_op->op_in_trx;
151 h.virtual_op = o_op->virtual_op;
152 h.is_virtual = o_op->is_virtual;
153 h.block_time = o_op->block_time;
158 if( !o_op.valid() ) {
165 if( o_op->block_num > _options.start_es_after_block )
170 doBlock( oho->trx_in_block, b, bulk_line_struct.
block_data );
171 if( _options.visitor )
175 const operation_history_object& op = *o_op;
178 flat_set<account_id_type> impacted;
179 vector<authority> other;
182 MUST_IGNORE_CUSTOM_OP_REQD_AUTHS( db.head_block_time() ) );
184 if( op.op.is_type< account_create_operation >() )
185 impacted.insert( account_id_type( op.result.get<object_id_type>() ) );
188 if( HARDFORK_CORE_265_PASSED(b.timestamp) || !op.op.is_type< account_create_operation >() )
191 MUST_IGNORE_CUSTOM_OP_REQD_AUTHS( db.head_block_time() ) );
197 if( op_result.value.impacted_accounts.valid() )
199 for(
const auto& a : *op_result.value.impacted_accounts )
200 impacted.insert( a );
204 for(
const auto& a : other )
205 for(
const auto& item : a.account_auths )
206 impacted.insert( item.first );
208 for(
const auto& account_id : impacted )
211 add_elasticsearch( account_id, oho, b.block_num() );
217 if( is_sync && !bulk_lines.empty() )
218 send_bulk( b.block_num() );
222 void elasticsearch_plugin_impl::send_bulk( uint32_t block_num )
224 ilog(
"Sending ${n} lines of bulk data to ElasticSearch at block ${b}, approximate size ${s}",
225 (
"n",bulk_lines.size())(
"b",block_num)(
"s",approximate_bulk_size) );
226 if( !es->send_bulk( bulk_lines ) )
228 elog(
"Error sending ${n} lines of bulk data to ElasticSearch, the first lines are:",
229 (
"n",bulk_lines.size()) );
230 const auto log_max = std::min( bulk_lines.size(),
size_t(10) );
231 for(
size_t i = 0; i < log_max; ++i )
233 edump( (bulk_lines[i]) );
236 "Error populating ES database, we are going to keep trying." );
239 approximate_bulk_size = 0;
240 bulk_lines.reserve(limit_documents);
247 limit_documents = _options.bulk_sync;
252 limit_documents = _options.bulk_replay;
255 bulk_lines.reserve(limit_documents);
262 template<
typename OpType>
265 return op.fee_payer();
278 if(_options.operation_string)
283 if(_options.operation_object) {
284 constexpr uint16_t current_depth = 2;
288 _options.max_mapping_depth - current_depth );
293 _options.max_mapping_depth - current_depth );
297 void elasticsearch_plugin_impl::doBlock(uint32_t trx_in_block,
const signed_block& b, block_struct& bs)
const
299 std::string trx_id =
"";
300 if(trx_in_block < b.transactions.size())
301 trx_id = b.transactions[trx_in_block].id().str();
302 bs.block_num = b.block_num();
303 bs.block_time = b.timestamp;
326 transfer_from = o.
from;
357 fee_asset = o.fee.asset_id;
358 fee_amount = o.fee.amount;
366 operation_visitor o_v;
369 auto fee_asset = o_v.fee_asset(db);
375 auto transfer_asset = o_v.transfer_asset_id(db);
384 auto fill_pays_asset = o_v.fill_pays_asset_id(db);
385 auto fill_receives_asset = o_v.fill_receives_asset_id(db);
399 auto fill_price = (o_v.fill_receives_amount.value
401 / (o_v.fill_pays_amount.value
408 void elasticsearch_plugin_impl::add_elasticsearch(
const account_id_type& account_id,
409 const optional<operation_history_object>& oho,
410 uint32_t block_number )
416 const auto &ath = db.
create<account_history_object>(
417 [&oho,&account_id,&stats_obj]( account_history_object &obj ) {
418 obj.operation_id = oho->id;
419 obj.account = account_id;
420 obj.sequence = stats_obj.total_ops + 1;
421 obj.next = stats_obj.most_recent_op;
424 db.
modify( stats_obj, [&ath]( account_statistics_object &obj ) {
425 obj.most_recent_op = ath.id;
426 obj.total_ops = ath.sequence;
429 if( block_number > _options.start_es_after_block )
431 bulk_line_struct.account_history = ath;
436 bulk_header[
"_index"] = index_name;
437 if( !is_es_version_7_or_above )
438 bulk_header[
"_type"] =
"_doc";
439 bulk_header[
"_id"] = std::string( ath.id );
441 std::move(prepare.begin(), prepare.end(), std::back_inserter(bulk_lines));
443 approximate_bulk_size += bulk_lines.back().size();
445 if( bulk_lines.size() >= limit_documents
447 send_bulk( block_number );
449 cleanObjects(ath, account_id);
452 void elasticsearch_plugin_impl::cleanObjects(
const account_history_object& ath,
453 const account_id_type& account_id )
458 const auto &by_seq_idx = his_idx.indices().get<by_seq>();
459 auto itr = by_seq_idx.lower_bound(boost::make_tuple(account_id, 0));
460 if (itr != by_seq_idx.end() && itr->account == account_id && itr->id != ath.id) {
462 const auto remove_op_id = itr->operation_id;
463 const auto itr_remove = itr;
468 if( itr != by_seq_idx.end() && itr->account == account_id )
470 db.
modify( *itr, []( account_history_object& obj ){
471 obj.next = account_history_id_type();
475 const auto &by_opid_idx = his_idx.indices().get<by_opid>();
476 if (by_opid_idx.find(remove_op_id) == by_opid_idx.end()) {
477 db.
remove(remove_op_id(db));
486 my(
std::make_unique<detail::elasticsearch_plugin_impl>(*this) )
495 return "elasticsearch";
499 return "Stores account history data in elasticsearch database(EXPERIMENTAL).";
503 boost::program_options::options_description& cli,
504 boost::program_options::options_description& cfg
508 (
"elasticsearch-node-url", boost::program_options::value<std::string>(),
509 "Elastic Search database node url(http://localhost:9200/)")
510 (
"elasticsearch-basic-auth", boost::program_options::value<std::string>(),
511 "Pass basic auth to elasticsearch database('')")
512 (
"elasticsearch-bulk-replay", boost::program_options::value<uint32_t>(),
513 "Number of bulk documents to index on replay(10000)")
514 (
"elasticsearch-bulk-sync", boost::program_options::value<uint32_t>(),
515 "Number of bulk documents to index on a syncronied chain(100)")
516 (
"elasticsearch-index-prefix", boost::program_options::value<std::string>(),
517 "Add a prefix to the index(bitshares-)")
518 (
"elasticsearch-max-mapping-depth", boost::program_options::value<uint16_t>(),
519 "The maximum index mapping depth (index.mapping.depth.limit) setting in ES, "
520 "should be >=2. (20)")
521 (
"elasticsearch-start-es-after-block", boost::program_options::value<uint32_t>(),
522 "Start doing ES job after block(0)")
523 (
"elasticsearch-visitor", boost::program_options::value<bool>(),
524 "Use visitor to index additional data(slows down the replay(false))")
525 (
"elasticsearch-operation-object", boost::program_options::value<bool>(),
526 "Save operation as object(true)")
527 (
"elasticsearch-operation-string", boost::program_options::value<bool>(),
528 "Save operation as string. Needed to serve history api calls(false)")
529 (
"elasticsearch-mode", boost::program_options::value<uint16_t>(),
530 "Mode of operation: only_save(0), only_query(1), all(2) - Default: 0")
535 void detail::elasticsearch_plugin_impl::init_program_options(
const boost::program_options::variables_map& options)
537 _options.init( options );
539 if( _options.visitor )
542 es = std::make_unique<graphene::utilities::es_client>( _options.elasticsearch_url, _options.auth );
544 FC_ASSERT( es->check_status(),
"ES database is not up in url ${url}", (
"url", _options.elasticsearch_url) );
546 es->check_version_7_or_above( is_es_version_7_or_above );
549 void detail::elasticsearch_plugin_impl::plugin_options::init(
const boost::program_options::variables_map& options)
562 FC_ASSERT( max_mapping_depth >= 2,
"The minimum value of elasticsearch-max-mapping-depth is 2" );
564 auto es_mode =
static_cast<uint16_t
>( elasticsearch_mode );
566 if( es_mode >
static_cast<uint16_t
>(
mode::all ) )
567 FC_THROW_EXCEPTION( graphene::chain::plugin_exception,
"Elasticsearch mode not valid" );
568 elasticsearch_mode =
static_cast<mode>( es_mode );
570 if(
mode::all == elasticsearch_mode && !operation_string )
573 "If elasticsearch-mode is set to all then elasticsearch-operation-string need to be true");
579 my->init_program_options( options );
588 my->update_account_histories(b);
602 const auto operation_id = source[
"account_history"][
"operation_id"];
624 const string operation_id_string = std::string(
object_id_type(
id));
626 const string query = R
"(
631 "account_history.operation_id": ")" + operation_id_string + R"("
637 const auto uri = my->_options.index_prefix + ( my->is_es_version_7_or_above ?
"*/_search" :
"*/_doc/_search" );
638 const auto response = my->es->query( uri, query );
640 const auto source = variant_response[
"hits"][
"hits"][size_t(0)][
"_source"];
641 return fromEStoOperation(source);
645 const account_id_type& account_id,
646 const operation_history_id_type& stop,
648 const operation_history_id_type& start )
const
650 const auto account_id_string = std::string( account_id );
652 const auto stop_number = stop.instance.value;
653 const auto start_number = start.instance.value;
658 else if(stop_number > 0)
662 const string query = R
"(
665 "sort" : [{ "operation_id_num" : {"order" : "desc"}}],
671 "query": "account_history.account: )" + account_id_string + range + R"("
680 vector<operation_history_object> result;
682 if( !my->es->check_status() )
685 const auto uri = my->_options.index_prefix + ( my->is_es_version_7_or_above ?
"*/_search" :
"*/_doc/_search" );
686 const auto response = my->es->query( uri, query );
690 const auto hits = variant_response[
"hits"][
"total"];
692 if( hits.is_object() )
695 size = hits.as_uint64();
696 size = std::min( size,
size_t(limit) );
698 const auto& data = variant_response[
"hits"][
"hits"];
699 for(
size_t i=0; i<size; ++i )
701 const auto& source = data[i][
"_source"];
702 result.push_back(fromEStoOperation(source));
709 return my->_options.elasticsearch_mode;