org.elasticsearch.client.transport.TransportClient.prepareBulk()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(12.1k)|赞(0)|评价(0)|浏览(175)

本文整理了Java中org.elasticsearch.client.transport.TransportClient.prepareBulk()方法的一些代码示例,展示了TransportClient.prepareBulk()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。TransportClient.prepareBulk()方法的具体详情如下:
包路径:org.elasticsearch.client.transport.TransportClient
类名称:TransportClient
方法名:prepareBulk

TransportClient.prepareBulk介绍

暂无

代码示例

代码示例来源:origin: alibaba/canal

/**
 * 根据主键更新数据
 * 
 * @param mapping
 * @param pkVal
 * @param esFieldData
 * @return
 */
public boolean update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
  BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
  append4Update(bulkRequestBuilder, mapping, pkVal, esFieldData);
  return commitBulkRequest(bulkRequestBuilder);
}

代码示例来源:origin: alibaba/canal

BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
        bulkRequestBuilderTmp = transportClient.prepareBulk();

代码示例来源:origin: alibaba/canal

int count = 0;
try {
  BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
      bulkRequestBuilder = transportClient.prepareBulk();

代码示例来源:origin: alibaba/canal

/**
 * 通过主键删除数据
 *
 * @param mapping
 * @param pkVal
 * @return
 */
public boolean delete(ESMapping mapping, Object pkVal) {
  BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
  if (mapping.get_id() != null) {
    bulkRequestBuilder
      .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), pkVal.toString()));
  } else {
    SearchResponse response = transportClient.prepareSearch(mapping.get_index())
      .setTypes(mapping.get_type())
      .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
      .setSize(MAX_BATCH_SIZE)
      .get();
    for (SearchHit hit : response.getHits()) {
      bulkRequestBuilder
        .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), hit.getId()));
    }
  }
  return commitBulkRequest(bulkRequestBuilder);
}

代码示例来源:origin: alibaba/canal

/**
 * 插入数据
 * 
 * @param mapping
 * @param pkVal
 * @param esFieldData
 * @return
 */
public boolean insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
  BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
  if (mapping.get_id() != null) {
    bulkRequestBuilder
      .add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
        .setSource(esFieldData));
  } else {
    SearchResponse response = transportClient.prepareSearch(mapping.get_index())
      .setTypes(mapping.get_type())
      .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
      .setSize(MAX_BATCH_SIZE)
      .get();
    for (SearchHit hit : response.getHits()) {
      bulkRequestBuilder
        .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), hit.getId()));
    }
    bulkRequestBuilder
      .add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type()).setSource(esFieldData));
  }
  return commitBulkRequest(bulkRequestBuilder);
}

代码示例来源:origin: Impetus/Kundera

BulkRequestBuilder bulkRequest = txClient.prepareBulk().setRefresh(isRefreshIndexes());

代码示例来源:origin: Impetus/Kundera

@Override
public int executeBatch()
  BulkRequestBuilder bulkRequest = txClient.prepareBulk().setRefresh(isRefreshIndexes());

代码示例来源:origin: apache/streams

/**
 * ElasticsearchPersistWriter constructor.
 * @param config config
 * @param manager manager
 */
public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) {
 this.config = config;
 this.manager = manager;
 this.bulkRequest = this.manager.client().prepareBulk();
}

代码示例来源:origin: apache/streams

private synchronized void flushInternal() {
 // we do not have a working bulk request, we can just exit here.
 if (this.bulkRequest == null || this.currentBatchItems.get() == 0) {
  return;
 }
 // wait for one minute to catch up if it needs to
 waitToCatchUp(5, 60 * 1000);
 // call the flush command.
 flush(this.bulkRequest, this.currentBatchItems.get(), this.currentBatchBytes.get());
 // reset the current batch statistics
 this.currentBatchItems.set(0);
 this.currentBatchBytes.set(0);
 // reset our bulk request builder
 this.bulkRequest = this.manager.client().prepareBulk();
}

代码示例来源:origin: harbby/sylph

@Override
public boolean open(long partitionId, long version)
    throws Exception
{
  String clusterName = config.clusterName;
  String hosts = config.hosts;
  Settings settings = Settings.builder().put("cluster.name", clusterName)
      .put("client.transport.sniff", true).build();
  TransportClient client = new PreBuiltTransportClient(settings);
  for (String ip : hosts.split(",")) {
    client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip.split(":")[0]), Integer.valueOf(ip.split(":")[1])));
  }
  this.client = client;
  this.bulkBuilder = client.prepareBulk();
  return true;
}

代码示例来源:origin: harbby/sylph

@Override
public boolean open(long partitionId, long version)
    throws Exception
{
  String clusterName = config.clusterName;
  String hosts = config.hosts;
  Settings settings = Settings.builder().put("cluster.name", clusterName)
      .put("client.transport.sniff", true).build();
  TransportClient client = new PreBuiltTransportClient(settings);
  for (String ip : hosts.split(",")) {
    client.addTransportAddress(
        new TransportAddress(InetAddress.getByName(ip.split(":")[0]),
            Integer.parseInt(ip.split(":")[1])));
  }
  this.client = client;
  this.bulkBuilder = client.prepareBulk();
  return true;
}

代码示例来源:origin: petterobam/database-oop

/**
 * Map方式写入Es
 *
 * @param index 索引
 * @param type  索引类型
 * @param id    需要插入的对象ID
 * @param o     需要插入的对象
 * @return 返回插入结果
 */
public BulkResponse bulkInsert(String index, String type, String id, Object o) {
  BulkRequestBuilder bulkRequest = client.prepareBulk();
  bulkRequest.add(bulkInsertAdd(index, type, id, o));
  return bulkRequest.execute().actionGet();
}

代码示例来源:origin: OpenSOC/opensoc-streaming

public boolean doIndex() throws Exception {
  try {
    synchronized (bulk_set) {
      if (client == null)
        throw new Exception("client is null");
      BulkRequestBuilder bulkRequest = client.prepareBulk();
      Iterator<JSONObject> iterator = bulk_set.iterator();
      while (iterator.hasNext()) {
        JSONObject setElement = iterator.next();
        IndexRequestBuilder a = client.prepareIndex(_index_name,
            _document_name);
        a.setSource(setElement.toString());
        bulkRequest.add(a);
      }
      _LOG.trace("[OpenSOC] Performing bulk load of size: "
          + bulkRequest.numberOfActions());
      BulkResponse resp = bulkRequest.execute().actionGet();
      _LOG.trace("[OpenSOC] Received bulk response: "
          + resp.toString());
      bulk_set.clear();
    }
    return true;
  }
  catch (Exception e) {
    e.printStackTrace();
    return false;
  }
}

代码示例来源:origin: apache/streams

/**
 * Attempt to removeOldTags.
 * @param ids ids
 * @param index index
 * @return Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
 */
public boolean removeOldTags(Set<String> ids, String index) {
 if (ids.size() == 0) {
  return false;
 }
 BulkRequestBuilder bulk = manager.client().prepareBulk();
 for (String id : ids) {
  bulk.add(manager.client().prepareDelete("_percolator", index, id));
 }
 return !bulk.execute().actionGet().hasFailures();
}

代码示例来源:origin: starcwang/canal_mysql_elasticsearch_sync

@Override
public void batchInsertById(String index, String type, Map<String, Map<String, Object>> idDataMap) {
  BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
  idDataMap.forEach((id, dataMap) -> bulkRequestBuilder.add(transportClient.prepareIndex(index, type, id).setSource(dataMap)));
  try {
    BulkResponse bulkResponse = bulkRequestBuilder.execute().get();
    if (bulkResponse.hasFailures()) {
      logger.error("elasticsearch批量插入错误, index=" + index + ", type=" + type + ", data=" + JsonUtil.toJson(idDataMap) + ", cause:" + bulkResponse.buildFailureMessage());
    }
  } catch (Exception e) {
    logger.error("elasticsearch批量插入错误, index=" + index + ", type=" + type + ", data=" + JsonUtil.toJson(idDataMap), e);
  }
}

代码示例来源:origin: apache/streams

/**
 * delete old queries.
 * @param index index
 * @return result
 */
public boolean deleteOldQueries(String index) {
 Set<String> tags = getActivePercolateTags(index);
 if (tags.size() == 0) {
  LOGGER.warn("No active tags were found in _percolator for index : {}", index);
  return false;
 }
 LOGGER.info("Deleting {} tags.", tags.size());
 BulkRequestBuilder bulk = manager.client().prepareBulk();
 for (String tag : tags) {
  bulk.add(manager.client().prepareDelete().setType(".percolator").setIndex(index).setId(tag));
 }
 BulkResponse response = bulk.execute().actionGet();
 return !response.hasFailures();
}

代码示例来源:origin: fujitsu-pio/io

/**
 * ルーティングIDに関係なくバルクでドキュメントを登録.
 * @param index インデックス名
 * @param bulkMap バルクドキュメント
 * @return ES応答
 */
public DcBulkResponse asyncBulkCreate(
    String index, Map<String, List<EsBulkRequest>> bulkMap) {
  BulkRequestBuilder bulkRequest = esTransportClient.prepareBulk();
  // ルーティングIDごとにバルク登録を行うと効率が悪いため、引数で渡されたEsBulkRequestは全て一括登録する。
  // また、バルク登録後にactionGet()すると同期実行となるため、ここでは実行しない。
  // このため、execute()のレスポンスを返却し、呼び出し側でactionGet()してからレスポンスチェック、リフレッシュすること。
  for (Entry<String, List<EsBulkRequest>> ents : bulkMap.entrySet()) {
    for (EsBulkRequest data : ents.getValue()) {
      IndexRequestBuilder req = esTransportClient.
          prepareIndex(index, data.getType(), data.getId()).setSource(data.getSource());
      if (routingFlag) {
        req = req.setRouting(ents.getKey());
      }
      bulkRequest.add(req);
    }
  }
  DcBulkResponse response = DcBulkResponseImpl.getInstance(bulkRequest.execute().actionGet());
  return response;
}

代码示例来源:origin: judasn/Elasticsearch-Tutorial-zh-CN

/**
 * 批量更新
 *
 * @param transportClient
 */
private static void batchUpdate(TransportClient transportClient) throws IOException {
  BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
  UpdateRequestBuilder updateRequestBuilder1 = transportClient.prepareUpdate("product_index", "product", "1")
      .setDoc(XContentFactory.jsonBuilder()
          .startObject()
          .field("product_name", "更新后的商品名称1")
          .endObject());
  UpdateRequestBuilder updateRequestBuilder2 = transportClient.prepareUpdate("product_index", "product", "2")
      .setDoc(XContentFactory.jsonBuilder()
          .startObject()
          .field("product_name", "更新后的商品名称2")
          .endObject());
  UpdateRequestBuilder updateRequestBuilder3 = transportClient.prepareUpdate("product_index", "product", "3")
      .setDoc(XContentFactory.jsonBuilder()
          .startObject()
          .field("product_name", "更新后的商品名称3")
          .endObject());
  bulkRequestBuilder.add(updateRequestBuilder1);
  bulkRequestBuilder.add(updateRequestBuilder2);
  bulkRequestBuilder.add(updateRequestBuilder3);
  BulkResponse bulkResponse = bulkRequestBuilder.get();
  for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
    logger.info("--------------------------------version= " + bulkItemResponse.getVersion());
  }
}

代码示例来源:origin: judasn/Elasticsearch-Tutorial-zh-CN

/**
 * 批量删除
 *
 * @param transportClient
 */
private static void batchDelete(TransportClient transportClient) throws IOException {
  BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
  DeleteRequestBuilder deleteRequestBuilder1 = transportClient.prepareDelete("product_index", "product", "1");
  DeleteRequestBuilder deleteRequestBuilder2 = transportClient.prepareDelete("product_index", "product", "2");
  DeleteRequestBuilder deleteRequestBuilder3 = transportClient.prepareDelete("product_index", "product", "3");
  bulkRequestBuilder.add(deleteRequestBuilder1);
  bulkRequestBuilder.add(deleteRequestBuilder2);
  bulkRequestBuilder.add(deleteRequestBuilder3);
  BulkResponse bulkResponse = bulkRequestBuilder.get();
  for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
    logger.info("--------------------------------version= " + bulkItemResponse.getVersion());
  }
}

代码示例来源:origin: apache/streams

@Override
public void prepare(Object configuration) {
 mapper = StreamsJacksonMapper.getInstance();
 Objects.requireNonNull(config);
 manager = ElasticsearchClientManager.getInstance(config);
 if ( config.getTags() != null && config.getTags().getAdditionalProperties().size() > 0) {
  // initial write tags to index
  createIndexIfMissing(config.getIndex());
  if (config.getReplaceTags()) {
   deleteOldQueries(config.getIndex());
  }
  for (String tag : config.getTags().getAdditionalProperties().keySet()) {
   String query = (String) config.getTags().getAdditionalProperties().get(tag);
   PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag, query, this.usePercolateField);
   addPercolateRule(queryBuilder, config.getIndex());
  }
  bulkBuilder = manager.client().prepareBulk();
  if (writePercolateRules()) {
   LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator");
  } else {
   LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator");
  }
 }
}

相关文章

微信公众号

最新文章

更多