org.apache.kafka.clients.producer.Producer.send()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(135)

本文整理了Java中org.apache.kafka.clients.producer.Producer.send()方法的一些代码示例,展示了Producer.send()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.send()方法的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
方法名:send

Producer.send介绍

[英]See KafkaProducer#send(ProducerRecord)
[中]参见卡夫卡制作人#发送(生产记录)

代码示例

代码示例来源:origin: QNJR-GROUP/EasyTransaction

public Future<RecordMetadata> publishKafkaMessage(ProducerRecord<String,byte[]> record){
    return kafkaProducer.send(record);
  }
}

代码示例来源:origin: alibaba/canal

private void produce(String topicName, int partition, FlatMessage flatMessage) throws ExecutionException,
                                        InterruptedException {
  ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
    partition,
    null,
    JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
  if (kafkaProperties.getTransaction()) {
    producer2.send(record);
  } else {
    producer2.send(record).get();
  }
}

代码示例来源:origin: line/armeria

@Override
public void log(RequestLog log) {
  final V value = valueExtractor.apply(log);
  if (value == null) {
    return;
  }
  final K key = keyExtractor.apply(log);
  final ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, value);
  producer.send(producerRecord, (metadata, exception) -> {
    if (exception != null) {
      logger.warn("Failed to send a record to Kafka: {}", producerRecord, exception);
    }
  });
}

代码示例来源:origin: jmxtrans/jmxtrans

@Override
protected void internalWrite(Server server, Query query, ImmutableList<Result> results) throws Exception {
  for (Result result : results) {
    log.debug("Query result: [{}]", result);
    String message = resultSerializer.serialize(server, query, result);
    for (String topic : this.topics) {
      log.debug("Topic: [{}] ; Kafka Message: [{}]", topic, message);
      producer.send(new ProducerRecord<String, String>(topic, message));
    }
  }
}

代码示例来源:origin: line/armeria

@Override
protected void writeLog(RequestLog log, L structuredLog) {
  final byte[] key = keySelector.selectKey(log, structuredLog);
  final ProducerRecord<byte[], L> producerRecord = new ProducerRecord<>(topic, key, structuredLog);
  producer.send(producerRecord, (metadata, exception) -> {
    if (exception != null) {
      logger.warn("failed to send service log to Kafka {}", producerRecord, exception);
    }
  });
}

代码示例来源:origin: jmxtrans/jmxtrans

@Override
public void doWrite(Server server, Query query, Iterable<Result> results) throws Exception {
  for (Result result : results) {
    String message = resultSerializer.serialize(server, query, result);
    if (message != null) {
      producer.send(new ProducerRecord<String, String>(topic, message));
    }
  }
}

代码示例来源:origin: apache/incubator-gobblin

@Override
public Future<WriteResponse> write(final D record, final WriteCallback callback) {
 return new WriteResponseFuture<>(this.producer.send(new ProducerRecord<String, D>(topic, record), new Callback() {
  @Override
  public void onCompletion(final RecordMetadata metadata, Exception exception) {
   if (exception != null) {
    callback.onFailure(exception);
   } else {
    callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
   }
  }
 }), WRITE_RESPONSE_WRAPPER);
}

代码示例来源:origin: OryxProject/oryx

@Override
public void send(K key, M message) {
 getProducer().send(new ProducerRecord<>(topic, key, message));
}

代码示例来源:origin: OryxProject/oryx

@Override
public void send(K key, M message) {
 getProducer().send(new ProducerRecord<>(topic, key, message));
}

代码示例来源:origin: apache/incubator-gobblin

@Override
public Future<WriteResponse> write(final D record, final WriteCallback callback) {
 return new WriteResponseFuture<>(this.producer.send(new ProducerRecord<String, D>(topic, record), new Callback() {
  @Override
  public void onCompletion(final RecordMetadata metadata, Exception exception) {
   if (exception != null) {
    callback.onFailure(exception);
   } else {
    callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
   }
  }
 }), WRITE_RESPONSE_WRAPPER);
}

代码示例来源:origin: openzipkin/brave

@Benchmark public RecordMetadata send_baseCase() throws Exception {
 return producer.send(record).get();
}

代码示例来源:origin: apache/incubator-druid

private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue)
{
 ObjectContainer<String> objectToSend;
 try {
  while (true) {
   objectToSend = recordQueue.take();
   producer.send(new ProducerRecord<>(topic, objectToSend.getData()), producerCallback);
  }
 }
 catch (InterruptedException e) {
  log.warn(e, "Failed to take record from queue!");
 }
}

代码示例来源:origin: apache/kylin

protected void send(String topic, Record record, Callback callback) {
  producer.send(new ProducerRecord<>(topic, record.getKey(), record.getValue()), callback);
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldSendCommandCorrectly() throws Exception {
 // When
 commandTopic.send(commandId1, command1);
 // Then
 verify(commandProducer).send(new ProducerRecord<>(COMMAND_TOPIC_NAME, 0, commandId1, command1));
 verify(future).get();
}

代码示例来源:origin: confluentinc/ksql

@Before
@SuppressWarnings("unchecked")
public void setup() {
 commandTopic = new CommandTopic(COMMAND_TOPIC_NAME, commandConsumer, commandProducer);
 when(commandProducer.send(any(ProducerRecord.class))).thenReturn(future);
}

代码示例来源:origin: line/armeria

@Test
public void withKeyExtractor() {
  final RequestLog log = mock(RequestLog.class);
  when(log.authority()).thenReturn("kawamuray");
  when(log.decodedPath()).thenReturn("kyuto");
  final KafkaAccessLogWriter<String, String> service =
      new KafkaAccessLogWriter<>(producer, TOPIC_NAME,
                    RequestLog::decodedPath, RequestLog::authority);
  service.log(log);
  verify(producer, times(1)).send(captor.capture(), any(Callback.class));
  final ProducerRecord<String, String> record = captor.getValue();
  assertThat(record.key()).isEqualTo("kyuto");
  assertThat(record.value()).isEqualTo("kawamuray");
}

代码示例来源:origin: line/armeria

@Test
public void withoutKeyExtractor() {
  final RequestLog log = mock(RequestLog.class);
  when(log.authority()).thenReturn("kawamuray");
  final KafkaAccessLogWriter<String, String> service =
      new KafkaAccessLogWriter<>(producer, TOPIC_NAME, RequestLog::authority);
  service.log(log);
  verify(producer, times(1)).send(captor.capture(), any(Callback.class));
  final ProducerRecord<String, String> record = captor.getValue();
  assertThat(record.key()).isNull();
  assertThat(record.value()).isEqualTo("kawamuray");
}

代码示例来源:origin: line/armeria

@Test
public void testWithKeySelector() {
  final KafkaStructuredLoggingServiceExposed service = new KafkaStructuredLoggingServiceExposed(
      producer, (res, log) -> log.name.getBytes(), false);
  final SimpleStructuredLog log = new SimpleStructuredLog("kawamuray");
  service.writeLog(null, log);
  verify(producer, times(1)).send(captor.capture(), any(Callback.class));
  final ProducerRecord<byte[], SimpleStructuredLog> record = captor.getValue();
  assertThat(record.key()).isNotNull();
  assertThat(new String(record.key())).isEqualTo(log.name);
  assertThat(record.value()).isEqualTo(log);
}

代码示例来源:origin: line/armeria

@Test
public void testServiceWithoutKeySelector() {
  final KafkaStructuredLoggingServiceExposed service =
      new KafkaStructuredLoggingServiceExposed(producer, null, false);
  final SimpleStructuredLog log = new SimpleStructuredLog("kawamuray");
  service.writeLog(null, log);
  verify(producer, times(1)).send(captor.capture(), any(Callback.class));
  final ProducerRecord<byte[], SimpleStructuredLog> record = captor.getValue();
  assertThat(record.key()).isNull();
  assertThat(record.value()).isEqualTo(log);
}

代码示例来源:origin: apache/kafka

@Test
public void testSendToInvalidTopic() throws Exception {
  Map<String, Object> configs = new HashMap<>();
  configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
  configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
  Time time = new MockTime();
  MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, emptyMap());
  Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
  metadata.update(initialUpdateResponse, time.milliseconds());
  MockClient client = new MockClient(time, metadata);
  Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
      metadata, client, null, time);
  String invalidTopicName = "topic abc"; // Invalid topic name due to space
  ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
  List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
  topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
      invalidTopicName, false, Collections.emptyList()));
  MetadataResponse updateResponse = new MetadataResponse(
      new ArrayList<>(initialUpdateResponse.brokers()),
      initialUpdateResponse.clusterId(),
      initialUpdateResponse.controller().id(),
      topicMetadata);
  client.prepareMetadataUpdate(updateResponse);
  Future<RecordMetadata> future = producer.send(record);
  assertEquals("Cluster has incorrect invalid topic list.", Collections.singleton(invalidTopicName),
      metadata.fetch().invalidTopics());
  TestUtils.assertFutureError(future, InvalidTopicException.class);
  producer.close(Duration.ofMillis(0));
}

相关文章