本文整理了Java中kafka.message.MessageAndMetadata.topic()
方法的一些代码示例,展示了MessageAndMetadata.topic()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MessageAndMetadata.topic()
方法的具体详情如下:
包路径:kafka.message.MessageAndMetadata
类名称:MessageAndMetadata
方法名:topic
暂无
代码示例来源:origin: pinterest/secor
@Override
public Message next() {
MessageAndMetadata<byte[], byte[]> kafkaMessage;
try {
kafkaMessage = mIterator.next();
} catch (ConsumerTimeoutException e) {
throw new LegacyConsumerTimeoutException(e);
}
long timestamp = 0L;
if (mConfig.useKafkaTimestamp()) {
timestamp = mKafkaMessageTimestampFactory.getKafkaMessageTimestamp().getTimestamp(kafkaMessage);
}
return new Message(kafkaMessage.topic(), kafkaMessage.partition(),
kafkaMessage.offset(), kafkaMessage.key(),
kafkaMessage.message(), timestamp);
}
代码示例来源:origin: uber/chaperone
private void processAuditMsg(final MessageAndMetadata mm) throws Exception {
JSONObject record = JSON.parseObject(StringUtils.toEncodedString((byte[]) mm.message(), Charset.forName("UTF-8")));
String topicName = record.getString(AuditMsgField.TOPICNAME.getName());
if (blacklistedTopics.contains(topicName)) {
logger.debug("Topic={} is blacklisted", topicName);
return;
}
if (deduplicator != null) {
String uuid = record.getString(AuditMsgField.UUID.getName());
String host = record.getString(AuditMsgField.HOSTNAME.getName());
if (deduplicator.isDuplicated(topicName, mm.partition(), mm.offset(), host, uuid)) {
return;
}
}
if (enablePersistentStore) {
auditReporter.submit(mm.topic(), mm.partition(), mm.offset(), record);
}
}
代码示例来源:origin: apache/incubator-edgent
@Override
public String topic() { return rec.topic(); };
@Override
代码示例来源:origin: org.apache.edgent/edgent-connectors-kafka
@Override
public String topic() { return rec.topic(); };
@Override
代码示例来源:origin: gwenshap/kafka-examples
public List<ProducerRecord<byte[], byte[]>> handle(MessageAndMetadata<byte[], byte[]> record) {
return Collections.singletonList(new ProducerRecord<byte[], byte[]>(topicPrefix + "." + record.topic(), record.partition(), record.key(), record.message()));
}
代码示例来源:origin: elodina/dropwizard-kafka-http
public Message(MessageAndMetadata<byte[], byte[]> message) {
this.topic = message.topic();
this.key = message.key() != null ? new String(message.key(), Charset.forName("utf-8")) : null;
this.message = new String(message.message(), Charset.forName("utf-8"));
this.partition = message.partition();
this.offset = message.offset();
}
}
代码示例来源:origin: salesforce/Argus
@Override
public void run() {
ConsumerIterator<byte[], byte[]> it = _stream.iterator();
while (it.hasNext()) {
Thread.yield();
if (Thread.currentThread().isInterrupted()) {
_logger.info("Interrupted... Will exit now.");
break;
}
MessageAndMetadata<byte[], byte[]> m = it.next();
try {
String message = new String(m.message());
String topic = m.topic();
if (message != null) {
_topics.get(topic).getMessages().put(message);
long c = count.incrementAndGet();
if (c % 50000 == 0) {
_logger.debug("Read {} messages.", count.get());
}
if (_topics.get(topic).getMessages().size() % 1000 == 0) {
_logger.debug("Message queued. Queue size = {}", _topics.get(topic).getMessages().size());
}
}
} catch (InterruptedException ie) {
_logger.debug("Interrupted while consuming message.");
Thread.currentThread().interrupt();
}
}
}
}
代码示例来源:origin: com.salesforce.argus/argus-core
@Override
public void run() {
ConsumerIterator<byte[], byte[]> it = _stream.iterator();
while (it.hasNext()) {
Thread.yield();
if (Thread.currentThread().isInterrupted()) {
_logger.info("Interrupted... Will exit now.");
break;
}
MessageAndMetadata<byte[], byte[]> m = it.next();
try {
String message = new String(m.message());
String topic = m.topic();
if (message != null) {
_topics.get(topic).getMessages().put(message);
long c = count.incrementAndGet();
if (c % 50000 == 0) {
_logger.debug("Read {} messages.", count.get());
}
if (_topics.get(topic).getMessages().size() % 1000 == 0) {
_logger.debug("Message queued. Queue size = {}", _topics.get(topic).getMessages().size());
}
}
} catch (InterruptedException ie) {
_logger.debug("Interrupted while consuming message.");
Thread.currentThread().interrupt();
}
}
}
}
代码示例来源:origin: b/kafka-websocket
@Override
@SuppressWarnings("unchecked")
public void run() {
String subprotocol = session.getNegotiatedSubprotocol();
for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : (Iterable<MessageAndMetadata<byte[], byte[]>>) stream) {
String topic = messageAndMetadata.topic();
byte[] message = messageAndMetadata.message();
switch(subprotocol) {
case "kafka-binary":
sendBinary(topic, message);
break;
default:
sendText(topic, message);
break;
}
if (Thread.currentThread().isInterrupted()) {
try {
session.close();
} catch (IOException e) {
LOG.error("Error terminating session: {}", e.getMessage());
}
return;
}
}
}
代码示例来源:origin: habren/KafkaExample
String message = String.format(
"Topic:%s, GroupID:%s, Consumer ID:%s, PartitionID:%s, Offset:%s, Message Key:%s, Message Payload: %s",
messageAndMetadata.topic(), groupid, consumerid, messageAndMetadata.partition(),
messageAndMetadata.offset(), new String(messageAndMetadata.key()),
new String(messageAndMetadata.message()));
代码示例来源:origin: apache/incubator-edgent
void accept(MessageAndMetadata<byte[],byte[]> rec) {
try {
trace.trace("{} received rec for topic:{} partition:{} offset:{}",
id(), rec.topic(), rec.partition(), rec.offset());
T tuple;
if (stringToTupleFn != null)
tuple = stringToTupleFn.apply(new StringConsumerRecord(rec));
else
tuple = byteToTupleFn.apply(new ByteConsumerRecord(rec));
eventSubmitter.accept(tuple);
}
catch (Exception e) {
String tp = String.format("[%s,%d]", rec.topic(), rec.partition());
trace.error("{} failure processing record from {}", id(), tp, e);
}
}
代码示例来源:origin: vakinge/jeesuite-libs
@Override
public void run() {
try {
long start = logger.isDebugEnabled() ? System.currentTimeMillis() : 0;
messageHandler.p2Process(message);
if(logger.isDebugEnabled()){
long useTime = System.currentTimeMillis() - start;
if(useTime > 1000)logger.debug("received_topic_useTime [{}]process topic:{} use time {} ms",processorName,topicName,useTime);
}
//回执
if(message.isConsumerAckRequired()){
consumerContext.sendConsumerAck(message.getMsgId());
}
consumerContext.saveOffsetsAfterProcessed(messageAndMeta.topic(), messageAndMeta.partition(), messageAndMeta.offset());
} catch (Exception e) {
boolean processed = messageHandler.onProcessError(message);
if(processed == false){
consumerContext.processErrorMessage(topicName, message);
}
logger.error("received_topic_process_error ["+processorName+"]processMessage error,topic:"+topicName,e);
}
consumerContext.updateConsumerStats(messageAndMeta.topic(),-1);
}
});
代码示例来源:origin: org.apache.edgent/edgent-connectors-kafka
void accept(MessageAndMetadata<byte[],byte[]> rec) {
try {
trace.trace("{} received rec for topic:{} partition:{} offset:{}",
id(), rec.topic(), rec.partition(), rec.offset());
T tuple;
if (stringToTupleFn != null)
tuple = stringToTupleFn.apply(new StringConsumerRecord(rec));
else
tuple = byteToTupleFn.apply(new ByteConsumerRecord(rec));
eventSubmitter.accept(tuple);
}
catch (Exception e) {
String tp = String.format("[%s,%d]", rec.topic(), rec.partition());
trace.error("{} failure processing record from {}", id(), tp, e);
}
}
代码示例来源:origin: vakinge/jeesuite-libs
message = new DefaultMessage(messageAndMeta.key(),(Serializable) _message);
message.setTopicMetadata(messageAndMeta.topic(), messageAndMeta.partition(), messageAndMeta.offset());
consumerContext.updateConsumerStats(messageAndMeta.topic(),1);
consumerContext.saveOffsetsBeforeProcessed(messageAndMeta.topic(), messageAndMeta.partition(), messageAndMeta.offset());
代码示例来源:origin: com.github.hackerwin7/jlib-utils
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
try {
KafkaMsg msg = KafkaMsg.createBuilder()
.key(new String(it.next().key()))
.val(it.next().message())
.offset(it.next().offset())
.partition(it.next().partition())
.topic(it.next().topic())
.build();
while (true) {//retry put
try {
queue.put(msg);
break;
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
try {
Thread.sleep(SLEEPING_INTERVAL);
} catch (InterruptedException ee) {
logger.error(e.getMessage(), e);
}
}
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
}
}
代码示例来源:origin: coderczp/dlog
@Override
public void run() {
ConsumerConfig cfg = new ConsumerConfig(props);
consumer = Consumer.createJavaConsumerConnector(cfg);
TopicFilter arg0 = new Whitelist(topic);
List<KafkaStream<byte[], byte[]>> partitions = consumer.createMessageStreamsByFilter(arg0);
while (!Thread.interrupted()) {
for (KafkaStream<byte[], byte[]> partition : partitions) {
ConsumerIterator<byte[], byte[]> it = partition
.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> msg = it.next();
onMessage(msg.topic(), new String(msg.message()));
}
}
}
}
}, "consumer-" + topic);
内容来源于网络,如有侵权,请联系作者删除!