org.apache.flume.Channel.take()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(3.4k)|赞(0)|评价(0)|浏览(162)

本文整理了Java中org.apache.flume.Channel.take()方法的一些代码示例,展示了Channel.take()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Channel.take()方法的具体详情如下:
包路径:org.apache.flume.Channel
类名称:Channel
方法名:take

Channel.take介绍

[英]Returns the next event from the channel if available. If the channel does not have any events available, this method must return null.

Note: This method must be invoked within an active Transaction boundary. Failure to do so can lead to unpredictable results.
[中]从通道返回下一个事件(如果可用)。如果通道没有任何可用事件,此方法必须返回null。
注意:此方法必须在活动事务边界内调用。不这样做可能导致不可预测的结果。

代码示例

代码示例来源:origin: apache/flume

@Override
 public Event call() {
  return channel.take();
 }
});

代码示例来源:origin: apache/flume

@Override
 public List<Event> call() {
  List<Event> events = new ArrayList<Event>(max);
  while (events.size() < max) {
   Event event = channel.take();
   if (event == null) {
    break;
   }
   events.add(event);
  }
  return events;
 }
});

代码示例来源:origin: apache/rocketmq-externals

long beginTime = System.currentTimeMillis();
while (true) {
  Event event = channel.take();
  if (event != null) {
    events.add(event);

代码示例来源:origin: apache/flume

Event event = ch.take();

代码示例来源:origin: apache/ignite

Event event = channel.take();

代码示例来源:origin: apache/flume

@Override
 public Status process() throws EventDeliveryException {
  Status result = Status.READY;
  Channel channel = getChannel();
  Transaction transaction = channel.getTransaction();
  Event event = null;

  try {
   transaction.begin();
   event = channel.take();

   if (event != null) {
    if (logger.isInfoEnabled()) {
     logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
    }
   } else {
    // No event found, request back-off semantics from the sink runner
    result = Status.BACKOFF;
   }
   transaction.commit();
  } catch (Exception ex) {
   transaction.rollback();
   throw new EventDeliveryException("Failed to log event: " + event, ex);
  } finally {
   transaction.close();
  }

  return result;
 }
}

代码示例来源:origin: kaaproject/kaa

int sinkEventCount = 0;
for (txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
 event = channel.take();
 if (event == null) {
  if ((System.currentTimeMillis() - cacheCleanupStartInterval) >= cacheCleanupInterval) {

代码示例来源:origin: apache/flume

Event event = channel.take();
if (event == null) {
 if (i == 0) {

代码示例来源:origin: apache/flume

Event event = channel.take();
if (event == null) {
 if (i == 0) {

代码示例来源:origin: apache/flume

long batchStartTime = System.nanoTime();
for (; processedEvents < batchSize; processedEvents += 1) {
 event = channel.take();

代码示例来源:origin: apache/flume

int i = 0;
for (i = 0; i < batchSize; i++) {
 event = channel.take();
 if (++eventCounter % logEveryNEvents == 0) {
  logger.info("Null sink {} successful processed {} events.", getName(), eventCounter);

代码示例来源:origin: apache/flume

try {
 for (; i < batchSize; i++) {
  Event event = channel.take();
  if (event == null) {
   status = Status.BACKOFF;

代码示例来源:origin: apache/flume

for (; txnEventCount < batchSize; ++txnEventCount) {
 Event event = channel.take();
 if (event == null) {
  break;

代码示例来源:origin: apache/flume

createConnection();
Event event = channel.take();

代码示例来源:origin: apache/flume

Event event = channel.take();

代码示例来源:origin: apache/flume

Event event = myChannel.take();
if (event == null) {
 break;

代码示例来源:origin: apache/flume

int count;
for (count = 0; count < batchSize; ++count) {
 Event event = channel.take();

代码示例来源:origin: apache/flume

int eventAttemptCounter = 0;
for (int i = 0; i < batchSize; i++) {
 event = channel.take();
 if (event != null) {
  sinkCounter.incrementEventDrainAttemptCount();

代码示例来源:origin: apache/flume

Event event = channel.take();

代码示例来源:origin: apache/flume

int txnEventCount = 0;
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
 Event event = channel.take();
 if (event == null) {
  break;

相关文章