kafka.message.MessageAndMetadata类的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.5k)|赞(0)|评价(0)|浏览(144)

本文整理了Java中kafka.message.MessageAndMetadata类的一些代码示例,展示了MessageAndMetadata类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MessageAndMetadata类的具体详情如下:
包路径:kafka.message.MessageAndMetadata
类名称:MessageAndMetadata

MessageAndMetadata介绍

暂无

代码示例

代码示例来源:origin: apache/incubator-gobblin

if (!iterator.hasNext()) {
 return;
messagePlusMeta = iterator.next();
if (messagePlusMeta!=null) {
 byte[] payload = messagePlusMeta.message();
 System.out.println("Got a message of size " + payload.length + " bytes");
 GenericRecord record = (GenericRecord) deserializer.deserialize(topic, payload);
 System.out.println(record.toString());
 checkpoint.update(messagePlusMeta.partition(), messagePlusMeta.offset());

代码示例来源:origin: pinterest/secor

@Override
public Message next() {
  MessageAndMetadata<byte[], byte[]> kafkaMessage;
  try {
    kafkaMessage = mIterator.next();
  } catch (ConsumerTimeoutException e) {
    throw new LegacyConsumerTimeoutException(e);
  }
  long timestamp = 0L;
  if (mConfig.useKafkaTimestamp()) {
    timestamp = mKafkaMessageTimestampFactory.getKafkaMessageTimestamp().getTimestamp(kafkaMessage);
  }
  return new Message(kafkaMessage.topic(), kafkaMessage.partition(),
      kafkaMessage.offset(), kafkaMessage.key(),
      kafkaMessage.message(), timestamp);
}

代码示例来源:origin: apache/incubator-gobblin

@Override
protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
 try {
  Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
  for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
   if (parsedMessage instanceof Either.Left) {
    this.newSpecs.inc();
    this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
   } else if (parsedMessage instanceof Either.Right) {
    this.removedSpecs.inc();
    this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight());
   }
  }
 } catch (IOException ioe) {
  String messageStr = new String(message.message(), Charsets.UTF_8);
  log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
 }
}

代码示例来源:origin: Graylog2/graylog2-server

while (consumerIterator.hasNext()) {
  if (paused) {
  final MessageAndMetadata<byte[], byte[]> message = consumerIterator.next();
  final byte[] bytes = message.message();

代码示例来源:origin: apache/incubator-druid

@Nullable
@Override
public InputRow nextRow()
{
 try {
  if (!nextIterator.hasNext()) {
   final byte[] message = iter.next().message();
   if (message == null) {
    return null;
   }
   nextIterator = theParser.parseBatch(ByteBuffer.wrap(message)).iterator();
  }
  return nextIterator.next();
 }
 catch (InvalidMessageException e) {
  /*
  IF the CRC is caused within the wire transfer, this is not the best way to handel CRC.
  Probably it is better to shutdown the fireHose without commit and start it again.
   */
  log.error(e, "Message failed its checksum and it is corrupt, will skip it");
  return null;
 }
}

代码示例来源:origin: apache/incubator-gobblin

@Test
public void test() throws IOException {
 // Test that the scoped config overrides the generic config
 Pusher pusher = new KafkaKeyValueProducerPusher<byte[], byte[]>("localhost:dummy", TOPIC,
   Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
     ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort()))));
 String msg1 = "msg1";
 String msg2 = "msg2";
 pusher.pushMessages(Lists.newArrayList(Pair.of("key1", msg1.getBytes()), Pair.of("key2", msg2.getBytes())));
 try {
  Thread.sleep(1000);
 } catch(InterruptedException ex) {
  Thread.currentThread().interrupt();
 }
 ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
 assert(iterator.hasNext());
 MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
 Assert.assertEquals(new String(messageAndMetadata.key()), "key1");
 Assert.assertEquals(new String(messageAndMetadata.message()), msg1);
 assert(iterator.hasNext());
 messageAndMetadata = iterator.next();
 Assert.assertEquals(new String(messageAndMetadata.key()), "key2");
 Assert.assertEquals(new String(messageAndMetadata.message()), msg2);
 pusher.close();
}

代码示例来源:origin: salesforce/Argus

@Override
  public void run() {
    ConsumerIterator<byte[], byte[]> it = _stream.iterator();
    while (it.hasNext()) {
      Thread.yield();
      if (Thread.currentThread().isInterrupted()) {
        _logger.info("Interrupted... Will exit now.");
        break;
      }
      MessageAndMetadata<byte[], byte[]> m = it.next();
      try {
        String message = new String(m.message());
        String topic = m.topic();
        if (message != null) {
          _topics.get(topic).getMessages().put(message);
          long c = count.incrementAndGet();
          if (c % 50000 == 0) {
            _logger.debug("Read {} messages.", count.get());
          }
          if (_topics.get(topic).getMessages().size() % 1000 == 0) {
            _logger.debug("Message queued. Queue size = {}", _topics.get(topic).getMessages().size());
          }
        }
      } catch (InterruptedException ie) {
        _logger.debug("Interrupted while consuming message.");
        Thread.currentThread().interrupt();
      }
    }
  }
}

代码示例来源:origin: apache/nifi

@Override
  public void process(final OutputStream out) throws IOException {
    if (!firstMessage) {
      out.write(demarcatorBytes);
    }
    out.write(mam.message());
  }
});

代码示例来源:origin: hadooparchitecturebook/fraud-detection-tutorial

e = this.hasNext();
if(e) {
 MessageAndMetadata endTime = this.it.next();
 byte[] kafkaMessage = (byte[])endTime.message();
 byte[] kafkaKey = (byte[])endTime.key();
 HashMap headers = new HashMap();
 headers.put("timestamp", String.valueOf(System.currentTimeMillis()));

代码示例来源:origin: com.hurence.logisland/logisland-kafka-0-8-plugin

@Override
  public KeyedMessage<String, String> extract(MessageAndMetadata<String, String> messageAndMetadata) {
    return new KeyedMessage(topicName, messageAndMetadata.key(), messageAndMetadata.message());
  }
});

代码示例来源:origin: uk.camsw.rx/rx-test-kafka

public static String messageAndPartition(MessageAndMetadata<byte[], byte[]> mamd) {
  return asUtf8(mamd.message()) + "@part" + mamd.partition();
}

代码示例来源:origin: b/kafka-websocket

@Override
@SuppressWarnings("unchecked")
public void run() {
  String subprotocol = session.getNegotiatedSubprotocol();
  for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : (Iterable<MessageAndMetadata<byte[], byte[]>>) stream) {
    String topic = messageAndMetadata.topic();
    byte[] message = messageAndMetadata.message();
    switch(subprotocol) {
      case "kafka-binary":
        sendBinary(topic, message);
        break;
      default:
        sendText(topic, message);
        break;
    }
    if (Thread.currentThread().isInterrupted()) {
      try {
        session.close();
      } catch (IOException e) {
        LOG.error("Error terminating session: {}", e.getMessage());
      }
      return;
    }
  }
}

代码示例来源:origin: apache/incubator-edgent

void accept(MessageAndMetadata<byte[],byte[]> rec) {
  try {
    trace.trace("{} received rec for topic:{} partition:{} offset:{}",
          id(), rec.topic(), rec.partition(), rec.offset());
    T tuple;
    if (stringToTupleFn != null)
      tuple = stringToTupleFn.apply(new StringConsumerRecord(rec));
    else
      tuple = byteToTupleFn.apply(new ByteConsumerRecord(rec));
    eventSubmitter.accept(tuple);
  }
  catch (Exception e) {
    String tp = String.format("[%s,%d]", rec.topic(), rec.partition());
    trace.error("{} failure processing record from {}", id(), tp, e);
  }
}

代码示例来源:origin: apache/incubator-edgent

@Override
public byte[] key() { return rec.key(); }
@Override

代码示例来源:origin: org.apache.edgent/edgent-connectors-kafka

@Override
  public long offset() { return rec.offset(); }
}

代码示例来源:origin: apache/incubator-edgent

@Override
public int partition() { return rec.partition(); }
@Override

代码示例来源:origin: org.apache.edgent/edgent-connectors-kafka

@Override
public String topic() { return rec.topic(); };
@Override

代码示例来源:origin: apache/incubator-gobblin

@Test
public void test() throws IOException {
 KafkaPusher pusher = new KafkaPusher("localhost:" + kafkaPort, TOPIC);
 String msg1 = "msg1";
 String msg2 = "msg2";
 pusher.pushMessages(Lists.newArrayList(msg1.getBytes(), msg2.getBytes()));
 try {
  Thread.sleep(1000);
 } catch(InterruptedException ex) {
  Thread.currentThread().interrupt();
 }
 assert(iterator.hasNext());
 Assert.assertEquals(new String(iterator.next().message()), msg1);
 assert(iterator.hasNext());
 Assert.assertEquals(new String(iterator.next().message()), msg2);
 pusher.close();
}

代码示例来源:origin: apache/incubator-gobblin

private void testRecordsWritten(int totalSuccessful, String topic)
  throws UnsupportedEncodingException {
 final ConsumerIterator<byte[], byte[]> iterator = kafkaTestHelper.getIteratorForTopic(topic);
 for (int i = 0; i < totalSuccessful; ++i) {
  String message = new String(iterator.next().message(), "UTF-8");
  log.debug(String.format("%d of %d: Message consumed: %s", (i+1), totalSuccessful, message));
 }
}

代码示例来源:origin: stackoverflow.com

while (it.hasNext()) {
  MessageAndMetadata messageAndMetadata = it.next();
  try {
    String key = (String) messageAndMetadata.key();
    IndexedRecord value = (IndexedRecord) messageAndMetadata.message();

相关文章