本文整理了Java中kafka.message.MessageAndMetadata
类的一些代码示例,展示了MessageAndMetadata
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MessageAndMetadata
类的具体详情如下:
包路径:kafka.message.MessageAndMetadata
类名称:MessageAndMetadata
暂无
代码示例来源:origin: apache/incubator-gobblin
if (!iterator.hasNext()) {
return;
messagePlusMeta = iterator.next();
if (messagePlusMeta!=null) {
byte[] payload = messagePlusMeta.message();
System.out.println("Got a message of size " + payload.length + " bytes");
GenericRecord record = (GenericRecord) deserializer.deserialize(topic, payload);
System.out.println(record.toString());
checkpoint.update(messagePlusMeta.partition(), messagePlusMeta.offset());
代码示例来源:origin: pinterest/secor
@Override
public Message next() {
MessageAndMetadata<byte[], byte[]> kafkaMessage;
try {
kafkaMessage = mIterator.next();
} catch (ConsumerTimeoutException e) {
throw new LegacyConsumerTimeoutException(e);
}
long timestamp = 0L;
if (mConfig.useKafkaTimestamp()) {
timestamp = mKafkaMessageTimestampFactory.getKafkaMessageTimestamp().getTimestamp(kafkaMessage);
}
return new Message(kafkaMessage.topic(), kafkaMessage.partition(),
kafkaMessage.offset(), kafkaMessage.key(),
kafkaMessage.message(), timestamp);
}
代码示例来源:origin: apache/incubator-gobblin
@Override
protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
try {
Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
if (parsedMessage instanceof Either.Left) {
this.newSpecs.inc();
this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
} else if (parsedMessage instanceof Either.Right) {
this.removedSpecs.inc();
this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight());
}
}
} catch (IOException ioe) {
String messageStr = new String(message.message(), Charsets.UTF_8);
log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
}
}
代码示例来源:origin: Graylog2/graylog2-server
while (consumerIterator.hasNext()) {
if (paused) {
final MessageAndMetadata<byte[], byte[]> message = consumerIterator.next();
final byte[] bytes = message.message();
代码示例来源:origin: apache/incubator-druid
@Nullable
@Override
public InputRow nextRow()
{
try {
if (!nextIterator.hasNext()) {
final byte[] message = iter.next().message();
if (message == null) {
return null;
}
nextIterator = theParser.parseBatch(ByteBuffer.wrap(message)).iterator();
}
return nextIterator.next();
}
catch (InvalidMessageException e) {
/*
IF the CRC is caused within the wire transfer, this is not the best way to handel CRC.
Probably it is better to shutdown the fireHose without commit and start it again.
*/
log.error(e, "Message failed its checksum and it is corrupt, will skip it");
return null;
}
}
代码示例来源:origin: apache/incubator-gobblin
@Test
public void test() throws IOException {
// Test that the scoped config overrides the generic config
Pusher pusher = new KafkaKeyValueProducerPusher<byte[], byte[]>("localhost:dummy", TOPIC,
Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort()))));
String msg1 = "msg1";
String msg2 = "msg2";
pusher.pushMessages(Lists.newArrayList(Pair.of("key1", msg1.getBytes()), Pair.of("key2", msg2.getBytes())));
try {
Thread.sleep(1000);
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}
ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
assert(iterator.hasNext());
MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
Assert.assertEquals(new String(messageAndMetadata.key()), "key1");
Assert.assertEquals(new String(messageAndMetadata.message()), msg1);
assert(iterator.hasNext());
messageAndMetadata = iterator.next();
Assert.assertEquals(new String(messageAndMetadata.key()), "key2");
Assert.assertEquals(new String(messageAndMetadata.message()), msg2);
pusher.close();
}
代码示例来源:origin: salesforce/Argus
@Override
public void run() {
ConsumerIterator<byte[], byte[]> it = _stream.iterator();
while (it.hasNext()) {
Thread.yield();
if (Thread.currentThread().isInterrupted()) {
_logger.info("Interrupted... Will exit now.");
break;
}
MessageAndMetadata<byte[], byte[]> m = it.next();
try {
String message = new String(m.message());
String topic = m.topic();
if (message != null) {
_topics.get(topic).getMessages().put(message);
long c = count.incrementAndGet();
if (c % 50000 == 0) {
_logger.debug("Read {} messages.", count.get());
}
if (_topics.get(topic).getMessages().size() % 1000 == 0) {
_logger.debug("Message queued. Queue size = {}", _topics.get(topic).getMessages().size());
}
}
} catch (InterruptedException ie) {
_logger.debug("Interrupted while consuming message.");
Thread.currentThread().interrupt();
}
}
}
}
代码示例来源:origin: apache/nifi
@Override
public void process(final OutputStream out) throws IOException {
if (!firstMessage) {
out.write(demarcatorBytes);
}
out.write(mam.message());
}
});
代码示例来源:origin: hadooparchitecturebook/fraud-detection-tutorial
e = this.hasNext();
if(e) {
MessageAndMetadata endTime = this.it.next();
byte[] kafkaMessage = (byte[])endTime.message();
byte[] kafkaKey = (byte[])endTime.key();
HashMap headers = new HashMap();
headers.put("timestamp", String.valueOf(System.currentTimeMillis()));
代码示例来源:origin: com.hurence.logisland/logisland-kafka-0-8-plugin
@Override
public KeyedMessage<String, String> extract(MessageAndMetadata<String, String> messageAndMetadata) {
return new KeyedMessage(topicName, messageAndMetadata.key(), messageAndMetadata.message());
}
});
代码示例来源:origin: uk.camsw.rx/rx-test-kafka
public static String messageAndPartition(MessageAndMetadata<byte[], byte[]> mamd) {
return asUtf8(mamd.message()) + "@part" + mamd.partition();
}
代码示例来源:origin: b/kafka-websocket
@Override
@SuppressWarnings("unchecked")
public void run() {
String subprotocol = session.getNegotiatedSubprotocol();
for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : (Iterable<MessageAndMetadata<byte[], byte[]>>) stream) {
String topic = messageAndMetadata.topic();
byte[] message = messageAndMetadata.message();
switch(subprotocol) {
case "kafka-binary":
sendBinary(topic, message);
break;
default:
sendText(topic, message);
break;
}
if (Thread.currentThread().isInterrupted()) {
try {
session.close();
} catch (IOException e) {
LOG.error("Error terminating session: {}", e.getMessage());
}
return;
}
}
}
代码示例来源:origin: apache/incubator-edgent
void accept(MessageAndMetadata<byte[],byte[]> rec) {
try {
trace.trace("{} received rec for topic:{} partition:{} offset:{}",
id(), rec.topic(), rec.partition(), rec.offset());
T tuple;
if (stringToTupleFn != null)
tuple = stringToTupleFn.apply(new StringConsumerRecord(rec));
else
tuple = byteToTupleFn.apply(new ByteConsumerRecord(rec));
eventSubmitter.accept(tuple);
}
catch (Exception e) {
String tp = String.format("[%s,%d]", rec.topic(), rec.partition());
trace.error("{} failure processing record from {}", id(), tp, e);
}
}
代码示例来源:origin: apache/incubator-edgent
@Override
public byte[] key() { return rec.key(); }
@Override
代码示例来源:origin: org.apache.edgent/edgent-connectors-kafka
@Override
public long offset() { return rec.offset(); }
}
代码示例来源:origin: apache/incubator-edgent
@Override
public int partition() { return rec.partition(); }
@Override
代码示例来源:origin: org.apache.edgent/edgent-connectors-kafka
@Override
public String topic() { return rec.topic(); };
@Override
代码示例来源:origin: apache/incubator-gobblin
@Test
public void test() throws IOException {
KafkaPusher pusher = new KafkaPusher("localhost:" + kafkaPort, TOPIC);
String msg1 = "msg1";
String msg2 = "msg2";
pusher.pushMessages(Lists.newArrayList(msg1.getBytes(), msg2.getBytes()));
try {
Thread.sleep(1000);
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}
assert(iterator.hasNext());
Assert.assertEquals(new String(iterator.next().message()), msg1);
assert(iterator.hasNext());
Assert.assertEquals(new String(iterator.next().message()), msg2);
pusher.close();
}
代码示例来源:origin: apache/incubator-gobblin
private void testRecordsWritten(int totalSuccessful, String topic)
throws UnsupportedEncodingException {
final ConsumerIterator<byte[], byte[]> iterator = kafkaTestHelper.getIteratorForTopic(topic);
for (int i = 0; i < totalSuccessful; ++i) {
String message = new String(iterator.next().message(), "UTF-8");
log.debug(String.format("%d of %d: Message consumed: %s", (i+1), totalSuccessful, message));
}
}
代码示例来源:origin: stackoverflow.com
while (it.hasNext()) {
MessageAndMetadata messageAndMetadata = it.next();
try {
String key = (String) messageAndMetadata.key();
IndexedRecord value = (IndexedRecord) messageAndMetadata.message();
内容来源于网络,如有侵权,请联系作者删除!