kafka.message.MessageAndMetadata.offset()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(133)

本文整理了Java中kafka.message.MessageAndMetadata.offset()方法的一些代码示例,展示了MessageAndMetadata.offset()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MessageAndMetadata.offset()方法的具体详情如下:
包路径:kafka.message.MessageAndMetadata
类名称:MessageAndMetadata
方法名:offset

MessageAndMetadata.offset介绍

暂无

代码示例

代码示例来源:origin: apache/incubator-gobblin

@Override
protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
 try {
  Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
  for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
   if (parsedMessage instanceof Either.Left) {
    this.newSpecs.inc();
    this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
   } else if (parsedMessage instanceof Either.Right) {
    this.removedSpecs.inc();
    this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight());
   }
  }
 } catch (IOException ioe) {
  String messageStr = new String(message.message(), Charsets.UTF_8);
  log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
 }
}

代码示例来源:origin: apache/incubator-gobblin

log.error(String.format("Failed to delete job/jobStateStore or parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);

代码示例来源:origin: apache/incubator-gobblin

GenericRecord record = (GenericRecord) deserializer.deserialize(topic, payload);
System.out.println(record.toString());
checkpoint.update(messagePlusMeta.partition(), messagePlusMeta.offset());

代码示例来源:origin: apache/nifi

attributes.put("kafka.offset", String.valueOf(mam.offset()));
attributes.put("kafka.partition", String.valueOf(mam.partition()));

代码示例来源:origin: pinterest/secor

@Override
public Message next() {
  MessageAndMetadata<byte[], byte[]> kafkaMessage;
  try {
    kafkaMessage = mIterator.next();
  } catch (ConsumerTimeoutException e) {
    throw new LegacyConsumerTimeoutException(e);
  }
  long timestamp = 0L;
  if (mConfig.useKafkaTimestamp()) {
    timestamp = mKafkaMessageTimestampFactory.getKafkaMessageTimestamp().getTimestamp(kafkaMessage);
  }
  return new Message(kafkaMessage.topic(), kafkaMessage.partition(),
      kafkaMessage.offset(), kafkaMessage.key(),
      kafkaMessage.message(), timestamp);
}

代码示例来源:origin: uber/chaperone

private void processAuditMsg(final MessageAndMetadata mm) throws Exception {
 JSONObject record = JSON.parseObject(StringUtils.toEncodedString((byte[]) mm.message(), Charset.forName("UTF-8")));
 String topicName = record.getString(AuditMsgField.TOPICNAME.getName());
 if (blacklistedTopics.contains(topicName)) {
  logger.debug("Topic={} is blacklisted", topicName);
  return;
 }
 if (deduplicator != null) {
  String uuid = record.getString(AuditMsgField.UUID.getName());
  String host = record.getString(AuditMsgField.HOSTNAME.getName());
  if (deduplicator.isDuplicated(topicName, mm.partition(), mm.offset(), host, uuid)) {
   return;
  }
 }
 if (enablePersistentStore) {
  auditReporter.submit(mm.topic(), mm.partition(), mm.offset(), record);
 }
}

代码示例来源:origin: org.apache.edgent/edgent-connectors-kafka

@Override
  public long offset() { return rec.offset(); }
}

代码示例来源:origin: apache/incubator-edgent

@Override
  public long offset() { return rec.offset(); }
}

代码示例来源:origin: uk.camsw.rx/rx-test-kafka

public static String messageAndOffset(MessageAndMetadata<byte[], byte[]> mamd) {
  return asUtf8(mamd.message()) + "@offset" + mamd.offset();
}

代码示例来源:origin: elodina/dropwizard-kafka-http

public Message(MessageAndMetadata<byte[], byte[]> message) {
    this.topic = message.topic();
    this.key = message.key() != null ? new String(message.key(), Charset.forName("utf-8")) : null;
    this.message = new String(message.message(), Charset.forName("utf-8"));
    this.partition = message.partition();
    this.offset = message.offset();
  }
}

代码示例来源:origin: org.apache.apex/malhar-contrib

public void run()
 {
  ConsumerIterator<byte[], byte[]> itr = stream.iterator();
  logger.debug("Thread {} starts consuming message...", Thread.currentThread().getName());
  while (itr.hasNext() && isAlive) {
   MessageAndMetadata<byte[], byte[]> mam = itr.next();
   try {
    kp.setPartitionId(mam.partition());
    putMessage(kp, new Message(mam.message()), mam.offset());
   } catch (InterruptedException e) {
    logger.error("Message Enqueue has been interrupted", e);
   }
  }
  logger.debug("Thread {} stops consuming message...", Thread.currentThread().getName());
 }
});

代码示例来源:origin: apache/apex-malhar

public void run()
 {
  ConsumerIterator<byte[], byte[]> itr = stream.iterator();
  logger.debug("Thread {} starts consuming message...", Thread.currentThread().getName());
  while (itr.hasNext() && isAlive) {
   MessageAndMetadata<byte[], byte[]> mam = itr.next();
   try {
    kp.setPartitionId(mam.partition());
    putMessage(kp, new Message(mam.message()), mam.offset());
   } catch (InterruptedException e) {
    logger.error("Message Enqueue has been interrupted", e);
   }
  }
  logger.debug("Thread {} stops consuming message...", Thread.currentThread().getName());
 }
});

代码示例来源:origin: com.linkedin.gobblin/gobblin-runtime

@Override
protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
 try {
  Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
  for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
   if (parsedMessage instanceof Either.Left) {
    this.newSpecs.inc();
    this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
   } else if (parsedMessage instanceof Either.Right) {
    this.remmovedSpecs.inc();
    this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight());
   }
  }
 } catch (IOException ioe) {
  String messageStr = new String(message.message(), Charsets.UTF_8);
  log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
 }
}

代码示例来源:origin: org.apache.gobblin/gobblin-runtime

@Override
protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
 try {
  Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
  for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
   if (parsedMessage instanceof Either.Left) {
    this.newSpecs.inc();
    this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
   } else if (parsedMessage instanceof Either.Right) {
    this.removedSpecs.inc();
    this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight());
   }
  }
 } catch (IOException ioe) {
  String messageStr = new String(message.message(), Charsets.UTF_8);
  log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
 }
}

代码示例来源:origin: vakinge/jeesuite-libs

@Override
  public void run() {
    try {	
      long start = logger.isDebugEnabled() ? System.currentTimeMillis() : 0;
      messageHandler.p2Process(message);
      if(logger.isDebugEnabled()){
        long useTime = System.currentTimeMillis() - start;
        if(useTime > 1000)logger.debug("received_topic_useTime [{}]process topic:{} use time {} ms",processorName,topicName,useTime);
      }
      //回执
      if(message.isConsumerAckRequired()){
        consumerContext.sendConsumerAck(message.getMsgId());
      }
      consumerContext.saveOffsetsAfterProcessed(messageAndMeta.topic(), messageAndMeta.partition(), messageAndMeta.offset());
    } catch (Exception e) {
      boolean processed = messageHandler.onProcessError(message);
      if(processed == false){
        consumerContext.processErrorMessage(topicName, message);
      }
      logger.error("received_topic_process_error ["+processorName+"]processMessage error,topic:"+topicName,e);
    }
    
    consumerContext.updateConsumerStats(messageAndMeta.topic(),-1);
  
  }
});

代码示例来源:origin: datasift/dropwizard-extra

/**
   * Processes a {@link Iterable} by iteratively processing each message.
   *
   * @param stream the stream of messages to process.
   * @param topic the topic the {@code stream} belongs to.
   *
   * @see StreamProcessor#process(Iterable, String)
   */
  public void process(final Iterable<MessageAndMetadata<K, V>> stream, final String topic) {
    for (final MessageAndMetadata<K, V> entry : stream) {
//            final Timer.Context context = processed.time();
      process(entry.key(), entry.message(), topic, entry.partition(), entry.offset());
//            context.stop();
    }
  }
}

代码示例来源:origin: mariamhakobyan/elasticsearch-river-kafka

/**
 * Consumes the messages from the partition via specified stream.
 */
private void consumeMessagesAndAddToBulkProcessor(final KafkaStream stream) {
  try {
    // by default it waits forever for message, but there is timeout configured
    final ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator();
    // Consume all the messages of the stream (partition)
    while (consumerIterator.hasNext() && consume) {
      final MessageAndMetadata messageAndMetadata = consumerIterator.next();
      logMessage(messageAndMetadata);
      elasticsearchProducer.addMessagesToBulkProcessor(messageAndMetadata);
      // StatsD reporting
      stats.messagesReceived.incrementAndGet();
      stats.lastCommitOffsetByPartitionId.put(messageAndMetadata.partition(), messageAndMetadata.offset());
    }
  } catch (ConsumerTimeoutException ex) {
    logger.debug("Nothing to be consumed for now. Consume flag is: {}", consume);
  }
}

代码示例来源:origin: apache/incubator-edgent

void accept(MessageAndMetadata<byte[],byte[]> rec) {
  try {
    trace.trace("{} received rec for topic:{} partition:{} offset:{}",
          id(), rec.topic(), rec.partition(), rec.offset());
    T tuple;
    if (stringToTupleFn != null)
      tuple = stringToTupleFn.apply(new StringConsumerRecord(rec));
    else
      tuple = byteToTupleFn.apply(new ByteConsumerRecord(rec));
    eventSubmitter.accept(tuple);
  }
  catch (Exception e) {
    String tp = String.format("[%s,%d]", rec.topic(), rec.partition());
    trace.error("{} failure processing record from {}", id(), tp, e);
  }
}

代码示例来源:origin: org.apache.edgent/edgent-connectors-kafka

void accept(MessageAndMetadata<byte[],byte[]> rec) {
  try {
    trace.trace("{} received rec for topic:{} partition:{} offset:{}",
          id(), rec.topic(), rec.partition(), rec.offset());
    T tuple;
    if (stringToTupleFn != null)
      tuple = stringToTupleFn.apply(new StringConsumerRecord(rec));
    else
      tuple = byteToTupleFn.apply(new ByteConsumerRecord(rec));
    eventSubmitter.accept(tuple);
  }
  catch (Exception e) {
    String tp = String.format("[%s,%d]", rec.topic(), rec.partition());
    trace.error("{} failure processing record from {}", id(), tp, e);
  }
}

代码示例来源:origin: com.github.hackerwin7/jlib-utils

public void run() {
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while (it.hasNext()) {
      try {
        KafkaMsg msg = KafkaMsg.createBuilder()
            .key(new String(it.next().key()))
            .val(it.next().message())
            .offset(it.next().offset())
            .partition(it.next().partition())
            .topic(it.next().topic())
            .build();
        while (true) {//retry put
          try {
            queue.put(msg);
            break;
          } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
            try {
              Thread.sleep(SLEEPING_INTERVAL);
            } catch (InterruptedException ee) {
              logger.error(e.getMessage(), e);
            }
          }
        }
      } catch (Throwable e) {
        logger.error(e.getMessage(), e);
      }
    }
  }
}

相关文章