kafka.message.MessageAndMetadata.topic()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.6k)|赞(0)|评价(0)|浏览(210)

本文整理了Java中kafka.message.MessageAndMetadata.topic()方法的一些代码示例,展示了MessageAndMetadata.topic()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MessageAndMetadata.topic()方法的具体详情如下:
包路径:kafka.message.MessageAndMetadata
类名称:MessageAndMetadata
方法名:topic

MessageAndMetadata.topic介绍

暂无

代码示例

代码示例来源:origin: pinterest/secor

@Override
public Message next() {
  MessageAndMetadata<byte[], byte[]> kafkaMessage;
  try {
    kafkaMessage = mIterator.next();
  } catch (ConsumerTimeoutException e) {
    throw new LegacyConsumerTimeoutException(e);
  }
  long timestamp = 0L;
  if (mConfig.useKafkaTimestamp()) {
    timestamp = mKafkaMessageTimestampFactory.getKafkaMessageTimestamp().getTimestamp(kafkaMessage);
  }
  return new Message(kafkaMessage.topic(), kafkaMessage.partition(),
      kafkaMessage.offset(), kafkaMessage.key(),
      kafkaMessage.message(), timestamp);
}

代码示例来源:origin: uber/chaperone

private void processAuditMsg(final MessageAndMetadata mm) throws Exception {
 JSONObject record = JSON.parseObject(StringUtils.toEncodedString((byte[]) mm.message(), Charset.forName("UTF-8")));
 String topicName = record.getString(AuditMsgField.TOPICNAME.getName());
 if (blacklistedTopics.contains(topicName)) {
  logger.debug("Topic={} is blacklisted", topicName);
  return;
 }
 if (deduplicator != null) {
  String uuid = record.getString(AuditMsgField.UUID.getName());
  String host = record.getString(AuditMsgField.HOSTNAME.getName());
  if (deduplicator.isDuplicated(topicName, mm.partition(), mm.offset(), host, uuid)) {
   return;
  }
 }
 if (enablePersistentStore) {
  auditReporter.submit(mm.topic(), mm.partition(), mm.offset(), record);
 }
}

代码示例来源:origin: apache/incubator-edgent

@Override
public String topic() { return rec.topic(); };
@Override

代码示例来源:origin: org.apache.edgent/edgent-connectors-kafka

@Override
public String topic() { return rec.topic(); };
@Override

代码示例来源:origin: gwenshap/kafka-examples

public List<ProducerRecord<byte[], byte[]>> handle(MessageAndMetadata<byte[], byte[]> record) {
  return Collections.singletonList(new ProducerRecord<byte[], byte[]>(topicPrefix + "." + record.topic(), record.partition(), record.key(), record.message()));
}

代码示例来源:origin: elodina/dropwizard-kafka-http

public Message(MessageAndMetadata<byte[], byte[]> message) {
    this.topic = message.topic();
    this.key = message.key() != null ? new String(message.key(), Charset.forName("utf-8")) : null;
    this.message = new String(message.message(), Charset.forName("utf-8"));
    this.partition = message.partition();
    this.offset = message.offset();
  }
}

代码示例来源:origin: salesforce/Argus

@Override
  public void run() {
    ConsumerIterator<byte[], byte[]> it = _stream.iterator();
    while (it.hasNext()) {
      Thread.yield();
      if (Thread.currentThread().isInterrupted()) {
        _logger.info("Interrupted... Will exit now.");
        break;
      }
      MessageAndMetadata<byte[], byte[]> m = it.next();
      try {
        String message = new String(m.message());
        String topic = m.topic();
        if (message != null) {
          _topics.get(topic).getMessages().put(message);
          long c = count.incrementAndGet();
          if (c % 50000 == 0) {
            _logger.debug("Read {} messages.", count.get());
          }
          if (_topics.get(topic).getMessages().size() % 1000 == 0) {
            _logger.debug("Message queued. Queue size = {}", _topics.get(topic).getMessages().size());
          }
        }
      } catch (InterruptedException ie) {
        _logger.debug("Interrupted while consuming message.");
        Thread.currentThread().interrupt();
      }
    }
  }
}

代码示例来源:origin: com.salesforce.argus/argus-core

@Override
  public void run() {
    ConsumerIterator<byte[], byte[]> it = _stream.iterator();
    while (it.hasNext()) {
      Thread.yield();
      if (Thread.currentThread().isInterrupted()) {
        _logger.info("Interrupted... Will exit now.");
        break;
      }
      MessageAndMetadata<byte[], byte[]> m = it.next();
      try {
        String message = new String(m.message());
        String topic = m.topic();
        if (message != null) {
          _topics.get(topic).getMessages().put(message);
          long c = count.incrementAndGet();
          if (c % 50000 == 0) {
            _logger.debug("Read {} messages.", count.get());
          }
          if (_topics.get(topic).getMessages().size() % 1000 == 0) {
            _logger.debug("Message queued. Queue size = {}", _topics.get(topic).getMessages().size());
          }
        }
      } catch (InterruptedException ie) {
        _logger.debug("Interrupted while consuming message.");
        Thread.currentThread().interrupt();
      }
    }
  }
}

代码示例来源:origin: b/kafka-websocket

@Override
@SuppressWarnings("unchecked")
public void run() {
  String subprotocol = session.getNegotiatedSubprotocol();
  for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : (Iterable<MessageAndMetadata<byte[], byte[]>>) stream) {
    String topic = messageAndMetadata.topic();
    byte[] message = messageAndMetadata.message();
    switch(subprotocol) {
      case "kafka-binary":
        sendBinary(topic, message);
        break;
      default:
        sendText(topic, message);
        break;
    }
    if (Thread.currentThread().isInterrupted()) {
      try {
        session.close();
      } catch (IOException e) {
        LOG.error("Error terminating session: {}", e.getMessage());
      }
      return;
    }
  }
}

代码示例来源:origin: habren/KafkaExample

String message = String.format(
    "Topic:%s, GroupID:%s, Consumer ID:%s, PartitionID:%s, Offset:%s, Message Key:%s, Message Payload: %s",
    messageAndMetadata.topic(), groupid, consumerid, messageAndMetadata.partition(),
    messageAndMetadata.offset(), new String(messageAndMetadata.key()),
    new String(messageAndMetadata.message()));

代码示例来源:origin: apache/incubator-edgent

void accept(MessageAndMetadata<byte[],byte[]> rec) {
  try {
    trace.trace("{} received rec for topic:{} partition:{} offset:{}",
          id(), rec.topic(), rec.partition(), rec.offset());
    T tuple;
    if (stringToTupleFn != null)
      tuple = stringToTupleFn.apply(new StringConsumerRecord(rec));
    else
      tuple = byteToTupleFn.apply(new ByteConsumerRecord(rec));
    eventSubmitter.accept(tuple);
  }
  catch (Exception e) {
    String tp = String.format("[%s,%d]", rec.topic(), rec.partition());
    trace.error("{} failure processing record from {}", id(), tp, e);
  }
}

代码示例来源:origin: vakinge/jeesuite-libs

@Override
  public void run() {
    try {	
      long start = logger.isDebugEnabled() ? System.currentTimeMillis() : 0;
      messageHandler.p2Process(message);
      if(logger.isDebugEnabled()){
        long useTime = System.currentTimeMillis() - start;
        if(useTime > 1000)logger.debug("received_topic_useTime [{}]process topic:{} use time {} ms",processorName,topicName,useTime);
      }
      //回执
      if(message.isConsumerAckRequired()){
        consumerContext.sendConsumerAck(message.getMsgId());
      }
      consumerContext.saveOffsetsAfterProcessed(messageAndMeta.topic(), messageAndMeta.partition(), messageAndMeta.offset());
    } catch (Exception e) {
      boolean processed = messageHandler.onProcessError(message);
      if(processed == false){
        consumerContext.processErrorMessage(topicName, message);
      }
      logger.error("received_topic_process_error ["+processorName+"]processMessage error,topic:"+topicName,e);
    }
    
    consumerContext.updateConsumerStats(messageAndMeta.topic(),-1);
  
  }
});

代码示例来源:origin: org.apache.edgent/edgent-connectors-kafka

void accept(MessageAndMetadata<byte[],byte[]> rec) {
  try {
    trace.trace("{} received rec for topic:{} partition:{} offset:{}",
          id(), rec.topic(), rec.partition(), rec.offset());
    T tuple;
    if (stringToTupleFn != null)
      tuple = stringToTupleFn.apply(new StringConsumerRecord(rec));
    else
      tuple = byteToTupleFn.apply(new ByteConsumerRecord(rec));
    eventSubmitter.accept(tuple);
  }
  catch (Exception e) {
    String tp = String.format("[%s,%d]", rec.topic(), rec.partition());
    trace.error("{} failure processing record from {}", id(), tp, e);
  }
}

代码示例来源:origin: vakinge/jeesuite-libs

message = new DefaultMessage(messageAndMeta.key(),(Serializable) _message);
message.setTopicMetadata(messageAndMeta.topic(), messageAndMeta.partition(), messageAndMeta.offset());
consumerContext.updateConsumerStats(messageAndMeta.topic(),1);
consumerContext.saveOffsetsBeforeProcessed(messageAndMeta.topic(), messageAndMeta.partition(), messageAndMeta.offset());

代码示例来源:origin: com.github.hackerwin7/jlib-utils

public void run() {
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while (it.hasNext()) {
      try {
        KafkaMsg msg = KafkaMsg.createBuilder()
            .key(new String(it.next().key()))
            .val(it.next().message())
            .offset(it.next().offset())
            .partition(it.next().partition())
            .topic(it.next().topic())
            .build();
        while (true) {//retry put
          try {
            queue.put(msg);
            break;
          } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
            try {
              Thread.sleep(SLEEPING_INTERVAL);
            } catch (InterruptedException ee) {
              logger.error(e.getMessage(), e);
            }
          }
        }
      } catch (Throwable e) {
        logger.error(e.getMessage(), e);
      }
    }
  }
}

代码示例来源:origin: coderczp/dlog

@Override
  public void run() {
    ConsumerConfig cfg = new ConsumerConfig(props);
    consumer = Consumer.createJavaConsumerConnector(cfg);
    TopicFilter arg0 = new Whitelist(topic);
    List<KafkaStream<byte[], byte[]>> partitions = consumer.createMessageStreamsByFilter(arg0);
    while (!Thread.interrupted()) {
      for (KafkaStream<byte[], byte[]> partition : partitions) {
        ConsumerIterator<byte[], byte[]> it = partition
            .iterator();
        while (it.hasNext()) {
          MessageAndMetadata<byte[], byte[]> msg = it.next();
          onMessage(msg.topic(), new String(msg.message()));
        }
      }
    }
  }
}, "consumer-" + topic);

相关文章