kafka.message.MessageAndMetadata.message()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(13.0k)|赞(0)|评价(0)|浏览(168)

本文整理了Java中kafka.message.MessageAndMetadata.message()方法的一些代码示例,展示了MessageAndMetadata.message()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MessageAndMetadata.message()方法的具体详情如下:
包路径:kafka.message.MessageAndMetadata
类名称:MessageAndMetadata
方法名:message

MessageAndMetadata.message介绍

暂无

代码示例

代码示例来源:origin: apache/nifi

@Override
  public void process(final OutputStream out) throws IOException {
    if (!firstMessage) {
      out.write(demarcatorBytes);
    }
    out.write(mam.message());
  }
});

代码示例来源:origin: apache/incubator-druid

@Nullable
@Override
public InputRow nextRow()
{
 try {
  if (!nextIterator.hasNext()) {
   final byte[] message = iter.next().message();
   if (message == null) {
    return null;
   }
   nextIterator = theParser.parseBatch(ByteBuffer.wrap(message)).iterator();
  }
  return nextIterator.next();
 }
 catch (InvalidMessageException e) {
  /*
  IF the CRC is caused within the wire transfer, this is not the best way to handel CRC.
  Probably it is better to shutdown the fireHose without commit and start it again.
   */
  log.error(e, "Message failed its checksum and it is corrupt, will skip it");
  return null;
 }
}

代码示例来源:origin: apache/incubator-gobblin

@Override
protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
 this.messages.add(new String(message.message(), Charsets.UTF_8));
}

代码示例来源:origin: apache/incubator-gobblin

@Override
protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
 try {
  Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
  for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
   if (parsedMessage instanceof Either.Left) {
    this.newSpecs.inc();
    this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
   } else if (parsedMessage instanceof Either.Right) {
    this.removedSpecs.inc();
    this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight());
   }
  }
 } catch (IOException ioe) {
  String messageStr = new String(message.message(), Charsets.UTF_8);
  log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
 }
}

代码示例来源:origin: apache/incubator-gobblin

@Override
protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
 try {
  Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
  for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
   JobSpec jobSpec = ((Either.Left<JobSpec, URI>)parsedMessage).getLeft();
  String messageStr = new String(message.message(), Charsets.UTF_8);
  log.error(String.format("Failed to delete job/jobStateStore or parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);

代码示例来源:origin: apache/incubator-gobblin

private void testRecordsWritten(int totalSuccessful, String topic)
  throws UnsupportedEncodingException {
 final ConsumerIterator<byte[], byte[]> iterator = kafkaTestHelper.getIteratorForTopic(topic);
 for (int i = 0; i < totalSuccessful; ++i) {
  String message = new String(iterator.next().message(), "UTF-8");
  log.debug(String.format("%d of %d: Message consumed: %s", (i+1), totalSuccessful, message));
 }
}

代码示例来源:origin: apache/incubator-gobblin

byte[] payload = messagePlusMeta.message();
System.out.println("Got a message of size " + payload.length + " bytes");
GenericRecord record = (GenericRecord) deserializer.deserialize(topic, payload);

代码示例来源:origin: Graylog2/graylog2-server

final byte[] bytes = message.message();

代码示例来源:origin: apache/incubator-gobblin

@Test
public void test() throws IOException {
 KafkaPusher pusher = new KafkaPusher("localhost:" + kafkaPort, TOPIC);
 String msg1 = "msg1";
 String msg2 = "msg2";
 pusher.pushMessages(Lists.newArrayList(msg1.getBytes(), msg2.getBytes()));
 try {
  Thread.sleep(1000);
 } catch(InterruptedException ex) {
  Thread.currentThread().interrupt();
 }
 assert(iterator.hasNext());
 Assert.assertEquals(new String(iterator.next().message()), msg1);
 assert(iterator.hasNext());
 Assert.assertEquals(new String(iterator.next().message()), msg2);
 pusher.close();
}

代码示例来源:origin: apache/incubator-pinot

if (kafkaIterator.hasNext()) {
 try {
  destination = _messageDecoder.decode(kafkaIterator.next().message(), destination);
  tableAndStreamRowsConsumed = _serverMetrics
    .addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_ROWS_CONSUMED, 1L,

代码示例来源:origin: apache/incubator-gobblin

@Test
public void test() throws IOException {
 // Test that the scoped config overrides the generic config
 Pusher pusher = new KafkaProducerPusher("localhost:dummy", TOPIC, Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
   ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort()))));
 String msg1 = "msg1";
 String msg2 = "msg2";
 pusher.pushMessages(Lists.newArrayList(msg1.getBytes(), msg2.getBytes()));
 try {
  Thread.sleep(1000);
 } catch(InterruptedException ex) {
  Thread.currentThread().interrupt();
 }
 ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
 assert(iterator.hasNext());
 Assert.assertEquals(new String(iterator.next().message()), msg1);
 assert(iterator.hasNext());
 Assert.assertEquals(new String(iterator.next().message()), msg2);
 pusher.close();
}

代码示例来源:origin: apache/incubator-gobblin

@Test
public void test() throws IOException {
 // Test that the scoped config overrides the generic config
 Pusher pusher = new KafkaKeyValueProducerPusher<byte[], byte[]>("localhost:dummy", TOPIC,
   Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
     ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort()))));
 String msg1 = "msg1";
 String msg2 = "msg2";
 pusher.pushMessages(Lists.newArrayList(Pair.of("key1", msg1.getBytes()), Pair.of("key2", msg2.getBytes())));
 try {
  Thread.sleep(1000);
 } catch(InterruptedException ex) {
  Thread.currentThread().interrupt();
 }
 ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
 assert(iterator.hasNext());
 MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
 Assert.assertEquals(new String(messageAndMetadata.key()), "key1");
 Assert.assertEquals(new String(messageAndMetadata.message()), msg1);
 assert(iterator.hasNext());
 messageAndMetadata = iterator.next();
 Assert.assertEquals(new String(messageAndMetadata.key()), "key2");
 Assert.assertEquals(new String(messageAndMetadata.message()), msg2);
 pusher.close();
}

代码示例来源:origin: apache/incubator-gobblin

@Test
public void testStringSerialization()
  throws IOException, InterruptedException {
 String topic = "testStringSerialization08";
 _kafkaTestHelper.provisionTopic(topic);
 Properties props = new Properties();
 props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
 props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "localhost:" + _kafkaTestHelper.getKafkaServerPort());
 props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 Kafka08DataWriter<String> kafka08DataWriter = new Kafka08DataWriter<String>(props);
 String messageString = "foobar";
 WriteCallback callback = mock(WriteCallback.class);
 try {
  kafka08DataWriter.write(messageString, callback);
 }
 finally
 {
  kafka08DataWriter.close();
 }
 verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
 verify(callback, never()).onFailure(isA(Exception.class));
 byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
 String messageReceived = new String(message);
 Assert.assertEquals(messageReceived, messageString);
}

代码示例来源:origin: apache/incubator-gobblin

@Test
public void testStringSerialization()
  throws IOException, InterruptedException, ExecutionException {
 String topic = "testStringSerialization08";
 _kafkaTestHelper.provisionTopic(topic);
 Properties props = new Properties();
 props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
 props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "localhost:" + _kafkaTestHelper.getKafkaServerPort());
 props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 Kafka09DataWriter<String> kafka09DataWriter = new Kafka09DataWriter<String>(props);
 String messageString = "foobar";
 WriteCallback callback = mock(WriteCallback.class);
 Future<WriteResponse> future;
 try {
  future = kafka09DataWriter.write(messageString, callback);
  kafka09DataWriter.flush();
  verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
  verify(callback, never()).onFailure(isA(Exception.class));
  Assert.assertTrue(future.isDone(), "Future should be done");
  System.out.println(future.get().getStringResponse());
  byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
  String messageReceived = new String(message);
  Assert.assertEquals(messageReceived, messageString);
 }
 finally
 {
  kafka09DataWriter.close();
 }
}

代码示例来源:origin: apache/incubator-gobblin

@Test
public void testBinarySerialization()
  throws IOException, InterruptedException {
 String topic = "testBinarySerialization08";
 _kafkaTestHelper.provisionTopic(topic);
 Properties props = new Properties();
 props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
 props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "localhost:" + _kafkaTestHelper.getKafkaServerPort());
 props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
 Kafka09DataWriter<byte[]> kafka09DataWriter = new Kafka09DataWriter<byte[]>(props);
 WriteCallback callback = mock(WriteCallback.class);
 byte[] messageBytes = TestUtils.generateRandomBytes();
 try {
  kafka09DataWriter.write(messageBytes, callback);
 }
 finally
 {
  kafka09DataWriter.close();
 }
 verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
 verify(callback, never()).onFailure(isA(Exception.class));
 byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
 Assert.assertEquals(message, messageBytes);
}

代码示例来源:origin: apache/incubator-gobblin

@Test
public void testBinarySerialization()
  throws IOException, InterruptedException {
 String topic = "testBinarySerialization08";
 _kafkaTestHelper.provisionTopic(topic);
 Properties props = new Properties();
 props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
 props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "localhost:" + _kafkaTestHelper.getKafkaServerPort());
 props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
 Kafka08DataWriter<byte[]> kafka08DataWriter = new Kafka08DataWriter<byte[]>(props);
 WriteCallback callback = mock(WriteCallback.class);
 byte[] messageBytes = TestUtils.generateRandomBytes();
 try {
  kafka08DataWriter.write(messageBytes, callback);
 }
 finally
 {
  kafka08DataWriter.close();
 }
 verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
 verify(callback, never()).onFailure(isA(Exception.class));
 byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
 Assert.assertEquals(message, messageBytes);
}

代码示例来源:origin: apache/incubator-gobblin

verify(callback, never()).onFailure(isA(Exception.class));
byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
ConfigDrivenMd5SchemaRegistry schemaReg = new ConfigDrivenMd5SchemaRegistry(topic, record.getSchema());
LiAvroDeserializer deser = new LiAvroDeserializer(schemaReg);

代码示例来源:origin: apache/incubator-gobblin

verify(callback, never()).onFailure(isA(Exception.class));
byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
ConfigDrivenMd5SchemaRegistry schemaReg = new ConfigDrivenMd5SchemaRegistry(topic, record.getSchema());
LiAvroDeserializer deser = new LiAvroDeserializer(schemaReg);

代码示例来源:origin: pinterest/secor

@Override
public Message next() {
  MessageAndMetadata<byte[], byte[]> kafkaMessage;
  try {
    kafkaMessage = mIterator.next();
  } catch (ConsumerTimeoutException e) {
    throw new LegacyConsumerTimeoutException(e);
  }
  long timestamp = 0L;
  if (mConfig.useKafkaTimestamp()) {
    timestamp = mKafkaMessageTimestampFactory.getKafkaMessageTimestamp().getTimestamp(kafkaMessage);
  }
  return new Message(kafkaMessage.topic(), kafkaMessage.partition(),
      kafkaMessage.offset(), kafkaMessage.key(),
      kafkaMessage.message(), timestamp);
}

代码示例来源:origin: uber/chaperone

private void processAuditMsg(final MessageAndMetadata mm) throws Exception {
 JSONObject record = JSON.parseObject(StringUtils.toEncodedString((byte[]) mm.message(), Charset.forName("UTF-8")));
 String topicName = record.getString(AuditMsgField.TOPICNAME.getName());
 if (blacklistedTopics.contains(topicName)) {
  logger.debug("Topic={} is blacklisted", topicName);
  return;
 }
 if (deduplicator != null) {
  String uuid = record.getString(AuditMsgField.UUID.getName());
  String host = record.getString(AuditMsgField.HOSTNAME.getName());
  if (deduplicator.isDuplicated(topicName, mm.partition(), mm.offset(), host, uuid)) {
   return;
  }
 }
 if (enablePersistentStore) {
  auditReporter.submit(mm.topic(), mm.partition(), mm.offset(), record);
 }
}

相关文章