org.elasticsearch.action.bulk.BulkRequest.numberOfActions()方法的使用及代码示例

x33g5p2x  于2022-01-16 转载在 其他  
字(8.5k)|赞(0)|评价(0)|浏览(108)

本文整理了Java中org.elasticsearch.action.bulk.BulkRequest.numberOfActions()方法的一些代码示例,展示了BulkRequest.numberOfActions()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。BulkRequest.numberOfActions()方法的具体详情如下:
包路径:org.elasticsearch.action.bulk.BulkRequest
类名称:BulkRequest
方法名:numberOfActions

BulkRequest.numberOfActions介绍

[英]The number of actions in the bulk request.
[中]批量请求中的操作数。

代码示例

代码示例来源:origin: brianway/webporter

@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
  logger.info("bulk request numberOfActions:" + bulkRequest.numberOfActions());
}

代码示例来源:origin: richardwilly98/elasticsearch-river-mongodb

@Override
public void beforeBulk(long executionId, BulkRequest request) {
  checkBulkProcessorAvailability();
  logger.trace("beforeBulk - new bulk [{}] of items [{}]", executionId, request.numberOfActions());
  if (flushBulkProcessor.get()) {
    logger.trace("About to flush bulk request index[{}] - type[{}]", index, type);
    int dropDollectionIndex = findLastDropCollection(request.requests());
    request.requests().subList(0, dropDollectionIndex + 1).clear();
    try {
      dropRecreateMapping();
      deletedDocuments.set(0);
      updatedDocuments.set(0);
      insertedDocuments.set(0);
      flushBulkProcessor.set(false);
    } catch (Throwable t) {
      logger.error("Drop collection operation failed", t);
      MongoDBRiverHelper.setRiverStatus(client, definition.getRiverName(), Status.IMPORT_FAILED);
      request.requests().clear();
      bulkProcessor.close();
      river.close();
    }
  }
}

代码示例来源:origin: org.elasticsearch/elasticsearch

/**
 * The number of actions currently in the bulk.
 */
public int numberOfActions() {
  return request.numberOfActions();
}

代码示例来源:origin: org.elasticsearch/elasticsearch

@Override
  public void run() {
    synchronized (BulkProcessor.this) {
      if (closed) {
        return;
      }
      if (bulkRequest.numberOfActions() == 0) {
        return;
      }
      execute();
    }
  }
}

代码示例来源:origin: apache/flink

@Override
  public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
    while (nextBulkRequest.numberOfActions() > 0) {
      // wait until we are allowed to continue with the flushing
      flushLatch.await();
      // create a copy of the accumulated mock requests, so that
      // re-added requests from the failure handler are included in the next bulk
      BulkRequest currentBulkRequest = nextBulkRequest;
      nextBulkRequest = new BulkRequest();
      listener.beforeBulk(123L, currentBulkRequest);
      if (nextBulkFailure == null) {
        BulkItemResponse[] mockResponses = new BulkItemResponse[currentBulkRequest.requests().size()];
        for (int i = 0; i < currentBulkRequest.requests().size(); i++) {
          Throwable mockItemFailure = mockItemFailuresList.get(i);
          if (mockItemFailure == null) {
            // the mock response for the item is success
            mockResponses[i] = new BulkItemResponse(i, "opType", mock(ActionResponse.class));
          } else {
            // the mock response for the item is failure
            mockResponses[i] = new BulkItemResponse(i, "opType", new BulkItemResponse.Failure("index", "type", "id", mockItemFailure));
          }
        }
        listener.afterBulk(123L, currentBulkRequest, new BulkResponse(mockResponses, 1000L));
      } else {
        listener.afterBulk(123L, currentBulkRequest, nextBulkFailure);
      }
    }
    return null;
  }
}).when(mockBulkProcessor).flush();

代码示例来源:origin: dadoonet/fscrawler

@Override public void beforeBulk(long executionId, BulkRequest request) {
  logger.trace("Sending a bulk request of [{}] requests", request.numberOfActions());
}

代码示例来源:origin: dadoonet/fscrawler

@Override public void beforeBulk(long executionId, BulkRequest request) {
  logger.trace("Sending a bulk request of [{}] requests", request.numberOfActions());
}

代码示例来源:origin: dadoonet/fscrawler

@Override public void beforeBulk(long executionId, BulkRequest request) {
  logger.trace("Sending a bulk request of [{}] requests", request.numberOfActions());
}

代码示例来源:origin: spring-projects/spring-data-elasticsearch

if (request.numberOfActions() > 0) {
  BulkResponse response;
  try {

代码示例来源:origin: org.elasticsearch/elasticsearch

private boolean isOverTheLimit() {
  if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) {
    return true;
  }
  if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) {
    return true;
  }
  return false;
}

代码示例来源:origin: spring-projects/spring-data-elasticsearch

for (int i = 0; i < bulkRequest.numberOfActions(); i++) {
  DocWriteRequest<?> action = bulkRequest.requests().get(i);

代码示例来源:origin: org.elasticsearch/elasticsearch

/**
 * Flush pending delete or index requests.
 */
public synchronized void flush() {
  ensureOpen();
  if (bulkRequest.numberOfActions() > 0) {
    execute();
  }
}

代码示例来源:origin: dadoonet/fscrawler

@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
  logger.trace("Executed bulk request with [{}] requests", request.numberOfActions());
  if (response.hasFailures()) {
    final int[] failures = {0};
    response.iterator().forEachRemaining(bir -> {
      if (bir.isFailed()) {
        failures[0]++;
        logger.debug("Error caught for [{}]/[{}]/[{}]: {}", bir.getIndex(),
            bir.getType(), bir.getId(), bir.getFailureMessage());
      }
    });
    logger.warn("Got [{}] failures of [{}] requests", failures[0], request.numberOfActions());
  }
}

代码示例来源:origin: dadoonet/fscrawler

@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
  logger.trace("Executed bulk request with [{}] requests", request.numberOfActions());
  if (response.hasFailures()) {
    final int[] failures = {0};
    response.iterator().forEachRemaining(bir -> {
      if (bir.isFailed()) {
        failures[0]++;
        logger.debug("Error caught for [{}]/[{}]/[{}]: {}", bir.getIndex(),
            bir.getType(), bir.getId(), bir.getFailureMessage());
      }
    });
    logger.warn("Got [{}] failures of [{}] requests", failures[0], request.numberOfActions());
  }
}

代码示例来源:origin: dadoonet/fscrawler

@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
  logger.trace("Executed bulk request with [{}] requests", request.numberOfActions());
  if (response.hasFailures()) {
    final int[] failures = {0};
    response.iterator().forEachRemaining(bir -> {
      if (bir.isFailed()) {
        failures[0]++;
        logger.debug("Error caught for [{}]/[{}]/[{}]: {}", bir.getIndex(),
            bir.getType(), bir.getId(), bir.getFailureMessage());
      }
    });
    logger.warn("Got [{}] failures of [{}] requests", failures[0], request.numberOfActions());
  }
}

代码示例来源:origin: tomoya92/pybbs

public void bulkDocument(String type, Map<String, Map<String, Object>> sources) {
 try {
  if (this.instance() == null) return;
  BulkRequest requests = new BulkRequest();
  Iterator<String> it = sources.keySet().iterator();
  int count = 0;
  while(it.hasNext()) {
   count++;
   String next = it.next();
   IndexRequest request = new IndexRequest(name, type, next);
   request.source(sources.get(next));
   requests.add(request);
   if (count % 1000 == 0) {
    client.bulk(requests, RequestOptions.DEFAULT);
    requests.requests().clear();
    count = 0;
   }
  }
  if (requests.numberOfActions() > 0) client.bulk(requests, RequestOptions.DEFAULT);
 } catch (IOException e) {
  log.error(e.getMessage());
 }
}

代码示例来源:origin: org.elasticsearch/elasticsearch

/**
 * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
 * <p>
 * If concurrent requests are not enabled, returns {@code true} immediately.
 * If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then returns {@code true}
 * If the specified waiting time elapses before all bulk requests complete, {@code false} is returned.
 *
 * @param timeout The maximum time to wait for the bulk requests to complete
 * @param unit    The time unit of the {@code timeout} argument
 * @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the bulk requests
 * completed
 * @throws InterruptedException If the current thread is interrupted
 */
public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
  if (closed) {
    return true;
  }
  closed = true;
  this.cancellableFlushTask.cancel();
  if (bulkRequest.numberOfActions() > 0) {
    execute();
  }
  try {
    return this.bulkRequestHandler.awaitClose(timeout, unit);
  } finally {
    onClose.run();
  }
}

代码示例来源:origin: tomoya92/pybbs

public void bulkDeleteDocument(String type, List<Integer> ids) {
 try {
  if (this.instance() == null) return;
  BulkRequest requests = new BulkRequest();
  int count = 0;
  for (Integer id: ids) {
   count++;
   DeleteRequest request = new DeleteRequest(name, type, String.valueOf(id));
   requests.add(request);
   if (count % 1000 == 0) {
    client.bulk(requests, RequestOptions.DEFAULT);
    requests.requests().clear();
    count = 0;
   }
  }
  if (requests.numberOfActions() > 0) client.bulk(requests, RequestOptions.DEFAULT);
 } catch (IOException e) {
  log.error(e.getMessage());
 }
}

代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.elasticsearch

/**
   * The number of actions currently in the bulk.
   */
  public int numberOfActions() {
    return request.numberOfActions();
  }
}

代码示例来源:origin: DigitalPebble/storm-crawler

@Override
public void beforeBulk(long executionId, BulkRequest request) {
  LOG.debug("beforeBulk {} with {} actions", executionId,
      request.numberOfActions());
  eventCounter.scope("bulks_received").incrBy(1);
}

相关文章