本文整理了Java中org.elasticsearch.action.bulk.BulkRequest.numberOfActions()
方法的一些代码示例,展示了BulkRequest.numberOfActions()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。BulkRequest.numberOfActions()
方法的具体详情如下:
包路径:org.elasticsearch.action.bulk.BulkRequest
类名称:BulkRequest
方法名:numberOfActions
[英]The number of actions in the bulk request.
[中]批量请求中的操作数。
代码示例来源:origin: brianway/webporter
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
logger.info("bulk request numberOfActions:" + bulkRequest.numberOfActions());
}
代码示例来源:origin: richardwilly98/elasticsearch-river-mongodb
@Override
public void beforeBulk(long executionId, BulkRequest request) {
checkBulkProcessorAvailability();
logger.trace("beforeBulk - new bulk [{}] of items [{}]", executionId, request.numberOfActions());
if (flushBulkProcessor.get()) {
logger.trace("About to flush bulk request index[{}] - type[{}]", index, type);
int dropDollectionIndex = findLastDropCollection(request.requests());
request.requests().subList(0, dropDollectionIndex + 1).clear();
try {
dropRecreateMapping();
deletedDocuments.set(0);
updatedDocuments.set(0);
insertedDocuments.set(0);
flushBulkProcessor.set(false);
} catch (Throwable t) {
logger.error("Drop collection operation failed", t);
MongoDBRiverHelper.setRiverStatus(client, definition.getRiverName(), Status.IMPORT_FAILED);
request.requests().clear();
bulkProcessor.close();
river.close();
}
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
/**
* The number of actions currently in the bulk.
*/
public int numberOfActions() {
return request.numberOfActions();
}
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
public void run() {
synchronized (BulkProcessor.this) {
if (closed) {
return;
}
if (bulkRequest.numberOfActions() == 0) {
return;
}
execute();
}
}
}
代码示例来源:origin: apache/flink
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
while (nextBulkRequest.numberOfActions() > 0) {
// wait until we are allowed to continue with the flushing
flushLatch.await();
// create a copy of the accumulated mock requests, so that
// re-added requests from the failure handler are included in the next bulk
BulkRequest currentBulkRequest = nextBulkRequest;
nextBulkRequest = new BulkRequest();
listener.beforeBulk(123L, currentBulkRequest);
if (nextBulkFailure == null) {
BulkItemResponse[] mockResponses = new BulkItemResponse[currentBulkRequest.requests().size()];
for (int i = 0; i < currentBulkRequest.requests().size(); i++) {
Throwable mockItemFailure = mockItemFailuresList.get(i);
if (mockItemFailure == null) {
// the mock response for the item is success
mockResponses[i] = new BulkItemResponse(i, "opType", mock(ActionResponse.class));
} else {
// the mock response for the item is failure
mockResponses[i] = new BulkItemResponse(i, "opType", new BulkItemResponse.Failure("index", "type", "id", mockItemFailure));
}
}
listener.afterBulk(123L, currentBulkRequest, new BulkResponse(mockResponses, 1000L));
} else {
listener.afterBulk(123L, currentBulkRequest, nextBulkFailure);
}
}
return null;
}
}).when(mockBulkProcessor).flush();
代码示例来源:origin: dadoonet/fscrawler
@Override public void beforeBulk(long executionId, BulkRequest request) {
logger.trace("Sending a bulk request of [{}] requests", request.numberOfActions());
}
代码示例来源:origin: dadoonet/fscrawler
@Override public void beforeBulk(long executionId, BulkRequest request) {
logger.trace("Sending a bulk request of [{}] requests", request.numberOfActions());
}
代码示例来源:origin: dadoonet/fscrawler
@Override public void beforeBulk(long executionId, BulkRequest request) {
logger.trace("Sending a bulk request of [{}] requests", request.numberOfActions());
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
if (request.numberOfActions() > 0) {
BulkResponse response;
try {
代码示例来源:origin: org.elasticsearch/elasticsearch
private boolean isOverTheLimit() {
if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) {
return true;
}
if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) {
return true;
}
return false;
}
代码示例来源:origin: spring-projects/spring-data-elasticsearch
for (int i = 0; i < bulkRequest.numberOfActions(); i++) {
DocWriteRequest<?> action = bulkRequest.requests().get(i);
代码示例来源:origin: org.elasticsearch/elasticsearch
/**
* Flush pending delete or index requests.
*/
public synchronized void flush() {
ensureOpen();
if (bulkRequest.numberOfActions() > 0) {
execute();
}
}
代码示例来源:origin: dadoonet/fscrawler
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
logger.trace("Executed bulk request with [{}] requests", request.numberOfActions());
if (response.hasFailures()) {
final int[] failures = {0};
response.iterator().forEachRemaining(bir -> {
if (bir.isFailed()) {
failures[0]++;
logger.debug("Error caught for [{}]/[{}]/[{}]: {}", bir.getIndex(),
bir.getType(), bir.getId(), bir.getFailureMessage());
}
});
logger.warn("Got [{}] failures of [{}] requests", failures[0], request.numberOfActions());
}
}
代码示例来源:origin: dadoonet/fscrawler
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
logger.trace("Executed bulk request with [{}] requests", request.numberOfActions());
if (response.hasFailures()) {
final int[] failures = {0};
response.iterator().forEachRemaining(bir -> {
if (bir.isFailed()) {
failures[0]++;
logger.debug("Error caught for [{}]/[{}]/[{}]: {}", bir.getIndex(),
bir.getType(), bir.getId(), bir.getFailureMessage());
}
});
logger.warn("Got [{}] failures of [{}] requests", failures[0], request.numberOfActions());
}
}
代码示例来源:origin: dadoonet/fscrawler
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
logger.trace("Executed bulk request with [{}] requests", request.numberOfActions());
if (response.hasFailures()) {
final int[] failures = {0};
response.iterator().forEachRemaining(bir -> {
if (bir.isFailed()) {
failures[0]++;
logger.debug("Error caught for [{}]/[{}]/[{}]: {}", bir.getIndex(),
bir.getType(), bir.getId(), bir.getFailureMessage());
}
});
logger.warn("Got [{}] failures of [{}] requests", failures[0], request.numberOfActions());
}
}
代码示例来源:origin: tomoya92/pybbs
public void bulkDocument(String type, Map<String, Map<String, Object>> sources) {
try {
if (this.instance() == null) return;
BulkRequest requests = new BulkRequest();
Iterator<String> it = sources.keySet().iterator();
int count = 0;
while(it.hasNext()) {
count++;
String next = it.next();
IndexRequest request = new IndexRequest(name, type, next);
request.source(sources.get(next));
requests.add(request);
if (count % 1000 == 0) {
client.bulk(requests, RequestOptions.DEFAULT);
requests.requests().clear();
count = 0;
}
}
if (requests.numberOfActions() > 0) client.bulk(requests, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error(e.getMessage());
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
/**
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
* <p>
* If concurrent requests are not enabled, returns {@code true} immediately.
* If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then returns {@code true}
* If the specified waiting time elapses before all bulk requests complete, {@code false} is returned.
*
* @param timeout The maximum time to wait for the bulk requests to complete
* @param unit The time unit of the {@code timeout} argument
* @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the bulk requests
* completed
* @throws InterruptedException If the current thread is interrupted
*/
public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
if (closed) {
return true;
}
closed = true;
this.cancellableFlushTask.cancel();
if (bulkRequest.numberOfActions() > 0) {
execute();
}
try {
return this.bulkRequestHandler.awaitClose(timeout, unit);
} finally {
onClose.run();
}
}
代码示例来源:origin: tomoya92/pybbs
public void bulkDeleteDocument(String type, List<Integer> ids) {
try {
if (this.instance() == null) return;
BulkRequest requests = new BulkRequest();
int count = 0;
for (Integer id: ids) {
count++;
DeleteRequest request = new DeleteRequest(name, type, String.valueOf(id));
requests.add(request);
if (count % 1000 == 0) {
client.bulk(requests, RequestOptions.DEFAULT);
requests.requests().clear();
count = 0;
}
}
if (requests.numberOfActions() > 0) client.bulk(requests, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error(e.getMessage());
}
}
代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.elasticsearch
/**
* The number of actions currently in the bulk.
*/
public int numberOfActions() {
return request.numberOfActions();
}
}
代码示例来源:origin: DigitalPebble/storm-crawler
@Override
public void beforeBulk(long executionId, BulkRequest request) {
LOG.debug("beforeBulk {} with {} actions", executionId,
request.numberOfActions());
eventCounter.scope("bulks_received").incrBy(1);
}
内容来源于网络,如有侵权,请联系作者删除!