本文整理了Java中kafka.message.MessageAndMetadata.message()
方法的一些代码示例,展示了MessageAndMetadata.message()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MessageAndMetadata.message()
方法的具体详情如下:
包路径:kafka.message.MessageAndMetadata
类名称:MessageAndMetadata
方法名:message
暂无
代码示例来源:origin: apache/nifi
@Override
public void process(final OutputStream out) throws IOException {
if (!firstMessage) {
out.write(demarcatorBytes);
}
out.write(mam.message());
}
});
代码示例来源:origin: apache/incubator-druid
@Nullable
@Override
public InputRow nextRow()
{
try {
if (!nextIterator.hasNext()) {
final byte[] message = iter.next().message();
if (message == null) {
return null;
}
nextIterator = theParser.parseBatch(ByteBuffer.wrap(message)).iterator();
}
return nextIterator.next();
}
catch (InvalidMessageException e) {
/*
IF the CRC is caused within the wire transfer, this is not the best way to handel CRC.
Probably it is better to shutdown the fireHose without commit and start it again.
*/
log.error(e, "Message failed its checksum and it is corrupt, will skip it");
return null;
}
}
代码示例来源:origin: apache/incubator-gobblin
@Override
protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
this.messages.add(new String(message.message(), Charsets.UTF_8));
}
代码示例来源:origin: apache/incubator-gobblin
@Override
protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
try {
Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
if (parsedMessage instanceof Either.Left) {
this.newSpecs.inc();
this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
} else if (parsedMessage instanceof Either.Right) {
this.removedSpecs.inc();
this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight());
}
}
} catch (IOException ioe) {
String messageStr = new String(message.message(), Charsets.UTF_8);
log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
}
}
代码示例来源:origin: apache/incubator-gobblin
@Override
protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
try {
Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
JobSpec jobSpec = ((Either.Left<JobSpec, URI>)parsedMessage).getLeft();
String messageStr = new String(message.message(), Charsets.UTF_8);
log.error(String.format("Failed to delete job/jobStateStore or parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
代码示例来源:origin: apache/incubator-gobblin
private void testRecordsWritten(int totalSuccessful, String topic)
throws UnsupportedEncodingException {
final ConsumerIterator<byte[], byte[]> iterator = kafkaTestHelper.getIteratorForTopic(topic);
for (int i = 0; i < totalSuccessful; ++i) {
String message = new String(iterator.next().message(), "UTF-8");
log.debug(String.format("%d of %d: Message consumed: %s", (i+1), totalSuccessful, message));
}
}
代码示例来源:origin: apache/incubator-gobblin
byte[] payload = messagePlusMeta.message();
System.out.println("Got a message of size " + payload.length + " bytes");
GenericRecord record = (GenericRecord) deserializer.deserialize(topic, payload);
代码示例来源:origin: Graylog2/graylog2-server
final byte[] bytes = message.message();
代码示例来源:origin: apache/incubator-gobblin
@Test
public void test() throws IOException {
KafkaPusher pusher = new KafkaPusher("localhost:" + kafkaPort, TOPIC);
String msg1 = "msg1";
String msg2 = "msg2";
pusher.pushMessages(Lists.newArrayList(msg1.getBytes(), msg2.getBytes()));
try {
Thread.sleep(1000);
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}
assert(iterator.hasNext());
Assert.assertEquals(new String(iterator.next().message()), msg1);
assert(iterator.hasNext());
Assert.assertEquals(new String(iterator.next().message()), msg2);
pusher.close();
}
代码示例来源:origin: apache/incubator-pinot
if (kafkaIterator.hasNext()) {
try {
destination = _messageDecoder.decode(kafkaIterator.next().message(), destination);
tableAndStreamRowsConsumed = _serverMetrics
.addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_ROWS_CONSUMED, 1L,
代码示例来源:origin: apache/incubator-gobblin
@Test
public void test() throws IOException {
// Test that the scoped config overrides the generic config
Pusher pusher = new KafkaProducerPusher("localhost:dummy", TOPIC, Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort()))));
String msg1 = "msg1";
String msg2 = "msg2";
pusher.pushMessages(Lists.newArrayList(msg1.getBytes(), msg2.getBytes()));
try {
Thread.sleep(1000);
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}
ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
assert(iterator.hasNext());
Assert.assertEquals(new String(iterator.next().message()), msg1);
assert(iterator.hasNext());
Assert.assertEquals(new String(iterator.next().message()), msg2);
pusher.close();
}
代码示例来源:origin: apache/incubator-gobblin
@Test
public void test() throws IOException {
// Test that the scoped config overrides the generic config
Pusher pusher = new KafkaKeyValueProducerPusher<byte[], byte[]>("localhost:dummy", TOPIC,
Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort()))));
String msg1 = "msg1";
String msg2 = "msg2";
pusher.pushMessages(Lists.newArrayList(Pair.of("key1", msg1.getBytes()), Pair.of("key2", msg2.getBytes())));
try {
Thread.sleep(1000);
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}
ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
assert(iterator.hasNext());
MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
Assert.assertEquals(new String(messageAndMetadata.key()), "key1");
Assert.assertEquals(new String(messageAndMetadata.message()), msg1);
assert(iterator.hasNext());
messageAndMetadata = iterator.next();
Assert.assertEquals(new String(messageAndMetadata.key()), "key2");
Assert.assertEquals(new String(messageAndMetadata.message()), msg2);
pusher.close();
}
代码示例来源:origin: apache/incubator-gobblin
@Test
public void testStringSerialization()
throws IOException, InterruptedException {
String topic = "testStringSerialization08";
_kafkaTestHelper.provisionTopic(topic);
Properties props = new Properties();
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "localhost:" + _kafkaTestHelper.getKafkaServerPort());
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Kafka08DataWriter<String> kafka08DataWriter = new Kafka08DataWriter<String>(props);
String messageString = "foobar";
WriteCallback callback = mock(WriteCallback.class);
try {
kafka08DataWriter.write(messageString, callback);
}
finally
{
kafka08DataWriter.close();
}
verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
verify(callback, never()).onFailure(isA(Exception.class));
byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
String messageReceived = new String(message);
Assert.assertEquals(messageReceived, messageString);
}
代码示例来源:origin: apache/incubator-gobblin
@Test
public void testStringSerialization()
throws IOException, InterruptedException, ExecutionException {
String topic = "testStringSerialization08";
_kafkaTestHelper.provisionTopic(topic);
Properties props = new Properties();
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "localhost:" + _kafkaTestHelper.getKafkaServerPort());
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Kafka09DataWriter<String> kafka09DataWriter = new Kafka09DataWriter<String>(props);
String messageString = "foobar";
WriteCallback callback = mock(WriteCallback.class);
Future<WriteResponse> future;
try {
future = kafka09DataWriter.write(messageString, callback);
kafka09DataWriter.flush();
verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
verify(callback, never()).onFailure(isA(Exception.class));
Assert.assertTrue(future.isDone(), "Future should be done");
System.out.println(future.get().getStringResponse());
byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
String messageReceived = new String(message);
Assert.assertEquals(messageReceived, messageString);
}
finally
{
kafka09DataWriter.close();
}
}
代码示例来源:origin: apache/incubator-gobblin
@Test
public void testBinarySerialization()
throws IOException, InterruptedException {
String topic = "testBinarySerialization08";
_kafkaTestHelper.provisionTopic(topic);
Properties props = new Properties();
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "localhost:" + _kafkaTestHelper.getKafkaServerPort());
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Kafka09DataWriter<byte[]> kafka09DataWriter = new Kafka09DataWriter<byte[]>(props);
WriteCallback callback = mock(WriteCallback.class);
byte[] messageBytes = TestUtils.generateRandomBytes();
try {
kafka09DataWriter.write(messageBytes, callback);
}
finally
{
kafka09DataWriter.close();
}
verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
verify(callback, never()).onFailure(isA(Exception.class));
byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
Assert.assertEquals(message, messageBytes);
}
代码示例来源:origin: apache/incubator-gobblin
@Test
public void testBinarySerialization()
throws IOException, InterruptedException {
String topic = "testBinarySerialization08";
_kafkaTestHelper.provisionTopic(topic);
Properties props = new Properties();
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", "localhost:" + _kafkaTestHelper.getKafkaServerPort());
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Kafka08DataWriter<byte[]> kafka08DataWriter = new Kafka08DataWriter<byte[]>(props);
WriteCallback callback = mock(WriteCallback.class);
byte[] messageBytes = TestUtils.generateRandomBytes();
try {
kafka08DataWriter.write(messageBytes, callback);
}
finally
{
kafka08DataWriter.close();
}
verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
verify(callback, never()).onFailure(isA(Exception.class));
byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
Assert.assertEquals(message, messageBytes);
}
代码示例来源:origin: apache/incubator-gobblin
verify(callback, never()).onFailure(isA(Exception.class));
byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
ConfigDrivenMd5SchemaRegistry schemaReg = new ConfigDrivenMd5SchemaRegistry(topic, record.getSchema());
LiAvroDeserializer deser = new LiAvroDeserializer(schemaReg);
代码示例来源:origin: apache/incubator-gobblin
verify(callback, never()).onFailure(isA(Exception.class));
byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
ConfigDrivenMd5SchemaRegistry schemaReg = new ConfigDrivenMd5SchemaRegistry(topic, record.getSchema());
LiAvroDeserializer deser = new LiAvroDeserializer(schemaReg);
代码示例来源:origin: pinterest/secor
@Override
public Message next() {
MessageAndMetadata<byte[], byte[]> kafkaMessage;
try {
kafkaMessage = mIterator.next();
} catch (ConsumerTimeoutException e) {
throw new LegacyConsumerTimeoutException(e);
}
long timestamp = 0L;
if (mConfig.useKafkaTimestamp()) {
timestamp = mKafkaMessageTimestampFactory.getKafkaMessageTimestamp().getTimestamp(kafkaMessage);
}
return new Message(kafkaMessage.topic(), kafkaMessage.partition(),
kafkaMessage.offset(), kafkaMessage.key(),
kafkaMessage.message(), timestamp);
}
代码示例来源:origin: uber/chaperone
private void processAuditMsg(final MessageAndMetadata mm) throws Exception {
JSONObject record = JSON.parseObject(StringUtils.toEncodedString((byte[]) mm.message(), Charset.forName("UTF-8")));
String topicName = record.getString(AuditMsgField.TOPICNAME.getName());
if (blacklistedTopics.contains(topicName)) {
logger.debug("Topic={} is blacklisted", topicName);
return;
}
if (deduplicator != null) {
String uuid = record.getString(AuditMsgField.UUID.getName());
String host = record.getString(AuditMsgField.HOSTNAME.getName());
if (deduplicator.isDuplicated(topicName, mm.partition(), mm.offset(), host, uuid)) {
return;
}
}
if (enablePersistentStore) {
auditReporter.submit(mm.topic(), mm.partition(), mm.offset(), record);
}
}
内容来源于网络,如有侵权,请联系作者删除!