本文整理了Java中kafka.message.MessageAndMetadata.offset()
方法的一些代码示例,展示了MessageAndMetadata.offset()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MessageAndMetadata.offset()
方法的具体详情如下:
包路径:kafka.message.MessageAndMetadata
类名称:MessageAndMetadata
方法名:offset
暂无
代码示例来源:origin: apache/incubator-gobblin
@Override
protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
try {
Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
if (parsedMessage instanceof Either.Left) {
this.newSpecs.inc();
this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
} else if (parsedMessage instanceof Either.Right) {
this.removedSpecs.inc();
this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight());
}
}
} catch (IOException ioe) {
String messageStr = new String(message.message(), Charsets.UTF_8);
log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
}
}
代码示例来源:origin: apache/incubator-gobblin
log.error(String.format("Failed to delete job/jobStateStore or parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
代码示例来源:origin: apache/incubator-gobblin
GenericRecord record = (GenericRecord) deserializer.deserialize(topic, payload);
System.out.println(record.toString());
checkpoint.update(messagePlusMeta.partition(), messagePlusMeta.offset());
代码示例来源:origin: apache/nifi
attributes.put("kafka.offset", String.valueOf(mam.offset()));
attributes.put("kafka.partition", String.valueOf(mam.partition()));
代码示例来源:origin: pinterest/secor
@Override
public Message next() {
MessageAndMetadata<byte[], byte[]> kafkaMessage;
try {
kafkaMessage = mIterator.next();
} catch (ConsumerTimeoutException e) {
throw new LegacyConsumerTimeoutException(e);
}
long timestamp = 0L;
if (mConfig.useKafkaTimestamp()) {
timestamp = mKafkaMessageTimestampFactory.getKafkaMessageTimestamp().getTimestamp(kafkaMessage);
}
return new Message(kafkaMessage.topic(), kafkaMessage.partition(),
kafkaMessage.offset(), kafkaMessage.key(),
kafkaMessage.message(), timestamp);
}
代码示例来源:origin: uber/chaperone
private void processAuditMsg(final MessageAndMetadata mm) throws Exception {
JSONObject record = JSON.parseObject(StringUtils.toEncodedString((byte[]) mm.message(), Charset.forName("UTF-8")));
String topicName = record.getString(AuditMsgField.TOPICNAME.getName());
if (blacklistedTopics.contains(topicName)) {
logger.debug("Topic={} is blacklisted", topicName);
return;
}
if (deduplicator != null) {
String uuid = record.getString(AuditMsgField.UUID.getName());
String host = record.getString(AuditMsgField.HOSTNAME.getName());
if (deduplicator.isDuplicated(topicName, mm.partition(), mm.offset(), host, uuid)) {
return;
}
}
if (enablePersistentStore) {
auditReporter.submit(mm.topic(), mm.partition(), mm.offset(), record);
}
}
代码示例来源:origin: org.apache.edgent/edgent-connectors-kafka
@Override
public long offset() { return rec.offset(); }
}
代码示例来源:origin: apache/incubator-edgent
@Override
public long offset() { return rec.offset(); }
}
代码示例来源:origin: uk.camsw.rx/rx-test-kafka
public static String messageAndOffset(MessageAndMetadata<byte[], byte[]> mamd) {
return asUtf8(mamd.message()) + "@offset" + mamd.offset();
}
代码示例来源:origin: elodina/dropwizard-kafka-http
public Message(MessageAndMetadata<byte[], byte[]> message) {
this.topic = message.topic();
this.key = message.key() != null ? new String(message.key(), Charset.forName("utf-8")) : null;
this.message = new String(message.message(), Charset.forName("utf-8"));
this.partition = message.partition();
this.offset = message.offset();
}
}
代码示例来源:origin: org.apache.apex/malhar-contrib
public void run()
{
ConsumerIterator<byte[], byte[]> itr = stream.iterator();
logger.debug("Thread {} starts consuming message...", Thread.currentThread().getName());
while (itr.hasNext() && isAlive) {
MessageAndMetadata<byte[], byte[]> mam = itr.next();
try {
kp.setPartitionId(mam.partition());
putMessage(kp, new Message(mam.message()), mam.offset());
} catch (InterruptedException e) {
logger.error("Message Enqueue has been interrupted", e);
}
}
logger.debug("Thread {} stops consuming message...", Thread.currentThread().getName());
}
});
代码示例来源:origin: apache/apex-malhar
public void run()
{
ConsumerIterator<byte[], byte[]> itr = stream.iterator();
logger.debug("Thread {} starts consuming message...", Thread.currentThread().getName());
while (itr.hasNext() && isAlive) {
MessageAndMetadata<byte[], byte[]> mam = itr.next();
try {
kp.setPartitionId(mam.partition());
putMessage(kp, new Message(mam.message()), mam.offset());
} catch (InterruptedException e) {
logger.error("Message Enqueue has been interrupted", e);
}
}
logger.debug("Thread {} stops consuming message...", Thread.currentThread().getName());
}
});
代码示例来源:origin: com.linkedin.gobblin/gobblin-runtime
@Override
protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
try {
Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
if (parsedMessage instanceof Either.Left) {
this.newSpecs.inc();
this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
} else if (parsedMessage instanceof Either.Right) {
this.remmovedSpecs.inc();
this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight());
}
}
} catch (IOException ioe) {
String messageStr = new String(message.message(), Charsets.UTF_8);
log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
}
}
代码示例来源:origin: org.apache.gobblin/gobblin-runtime
@Override
protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
try {
Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
if (parsedMessage instanceof Either.Left) {
this.newSpecs.inc();
this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
} else if (parsedMessage instanceof Either.Right) {
this.removedSpecs.inc();
this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight());
}
}
} catch (IOException ioe) {
String messageStr = new String(message.message(), Charsets.UTF_8);
log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
}
}
代码示例来源:origin: vakinge/jeesuite-libs
@Override
public void run() {
try {
long start = logger.isDebugEnabled() ? System.currentTimeMillis() : 0;
messageHandler.p2Process(message);
if(logger.isDebugEnabled()){
long useTime = System.currentTimeMillis() - start;
if(useTime > 1000)logger.debug("received_topic_useTime [{}]process topic:{} use time {} ms",processorName,topicName,useTime);
}
//回执
if(message.isConsumerAckRequired()){
consumerContext.sendConsumerAck(message.getMsgId());
}
consumerContext.saveOffsetsAfterProcessed(messageAndMeta.topic(), messageAndMeta.partition(), messageAndMeta.offset());
} catch (Exception e) {
boolean processed = messageHandler.onProcessError(message);
if(processed == false){
consumerContext.processErrorMessage(topicName, message);
}
logger.error("received_topic_process_error ["+processorName+"]processMessage error,topic:"+topicName,e);
}
consumerContext.updateConsumerStats(messageAndMeta.topic(),-1);
}
});
代码示例来源:origin: datasift/dropwizard-extra
/**
* Processes a {@link Iterable} by iteratively processing each message.
*
* @param stream the stream of messages to process.
* @param topic the topic the {@code stream} belongs to.
*
* @see StreamProcessor#process(Iterable, String)
*/
public void process(final Iterable<MessageAndMetadata<K, V>> stream, final String topic) {
for (final MessageAndMetadata<K, V> entry : stream) {
// final Timer.Context context = processed.time();
process(entry.key(), entry.message(), topic, entry.partition(), entry.offset());
// context.stop();
}
}
}
代码示例来源:origin: mariamhakobyan/elasticsearch-river-kafka
/**
* Consumes the messages from the partition via specified stream.
*/
private void consumeMessagesAndAddToBulkProcessor(final KafkaStream stream) {
try {
// by default it waits forever for message, but there is timeout configured
final ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator();
// Consume all the messages of the stream (partition)
while (consumerIterator.hasNext() && consume) {
final MessageAndMetadata messageAndMetadata = consumerIterator.next();
logMessage(messageAndMetadata);
elasticsearchProducer.addMessagesToBulkProcessor(messageAndMetadata);
// StatsD reporting
stats.messagesReceived.incrementAndGet();
stats.lastCommitOffsetByPartitionId.put(messageAndMetadata.partition(), messageAndMetadata.offset());
}
} catch (ConsumerTimeoutException ex) {
logger.debug("Nothing to be consumed for now. Consume flag is: {}", consume);
}
}
代码示例来源:origin: apache/incubator-edgent
void accept(MessageAndMetadata<byte[],byte[]> rec) {
try {
trace.trace("{} received rec for topic:{} partition:{} offset:{}",
id(), rec.topic(), rec.partition(), rec.offset());
T tuple;
if (stringToTupleFn != null)
tuple = stringToTupleFn.apply(new StringConsumerRecord(rec));
else
tuple = byteToTupleFn.apply(new ByteConsumerRecord(rec));
eventSubmitter.accept(tuple);
}
catch (Exception e) {
String tp = String.format("[%s,%d]", rec.topic(), rec.partition());
trace.error("{} failure processing record from {}", id(), tp, e);
}
}
代码示例来源:origin: org.apache.edgent/edgent-connectors-kafka
void accept(MessageAndMetadata<byte[],byte[]> rec) {
try {
trace.trace("{} received rec for topic:{} partition:{} offset:{}",
id(), rec.topic(), rec.partition(), rec.offset());
T tuple;
if (stringToTupleFn != null)
tuple = stringToTupleFn.apply(new StringConsumerRecord(rec));
else
tuple = byteToTupleFn.apply(new ByteConsumerRecord(rec));
eventSubmitter.accept(tuple);
}
catch (Exception e) {
String tp = String.format("[%s,%d]", rec.topic(), rec.partition());
trace.error("{} failure processing record from {}", id(), tp, e);
}
}
代码示例来源:origin: com.github.hackerwin7/jlib-utils
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
try {
KafkaMsg msg = KafkaMsg.createBuilder()
.key(new String(it.next().key()))
.val(it.next().message())
.offset(it.next().offset())
.partition(it.next().partition())
.topic(it.next().topic())
.build();
while (true) {//retry put
try {
queue.put(msg);
break;
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
try {
Thread.sleep(SLEEPING_INTERVAL);
} catch (InterruptedException ee) {
logger.error(e.getMessage(), e);
}
}
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
}
}
内容来源于网络,如有侵权,请联系作者删除!