org.apache.kafka.clients.producer.Producer类的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(8.9k)|赞(0)|评价(0)|浏览(196)

本文整理了Java中org.apache.kafka.clients.producer.Producer类的一些代码示例,展示了Producer类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer类的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer

Producer介绍

[英]The interface for the KafkaProducer
[中]卡夫卡制作人的界面

代码示例

代码示例来源:origin: QNJR-GROUP/EasyTransaction

public Future<RecordMetadata> publishKafkaMessage(ProducerRecord<String,byte[]> record){
    return kafkaProducer.send(record);
  }
}

代码示例来源:origin: apache/storm

@Override
public void cleanup() {
  producer.close();
}

代码示例来源:origin: uber-common/jvm-profiler

@Override
public void close() {
  synchronized (this) {
    if (producer == null) {
      return;
    }
    producer.flush();
    producer.close();
    producer = null;
  }
}

代码示例来源:origin: apache/nifi

void beginTransaction() {
  if (!useTransactions) {
    return;
  }
  if (!transactionsInitialized) {
    producer.initTransactions();
    transactionsInitialized = true;
  }
  producer.beginTransaction();
  activeTransaction = true;
}

代码示例来源:origin: spring-projects/spring-kafka

@SuppressWarnings("unchecked")
Producer<Object, Object> producer2 = mock(Producer.class);
producer1.initTransactions();
AtomicBoolean first = new AtomicBoolean(true);
  inOrder.verify(producer1).beginTransaction();
  inOrder.verify(producer2).beginTransaction();
  inOrder.verify(producer2).commitTransaction();
  inOrder.verify(producer2).close();
  inOrder.verify(producer1).commitTransaction();
  inOrder.verify(producer1).close();

代码示例来源:origin: apache/kafka

try {
  Future<?> future = executor.submit(() -> {
    producer.send(new ProducerRecord<>("topic", "key", "value"));
    try {
      producer.close();
      fail("Close should block and throw.");
    } catch (Exception e) {

代码示例来源:origin: uber-common/jvm-profiler

@Override
public void report(String profilerName, Map<String, Object> metrics) {
  ensureProducer();
  String topicName = getTopic(profilerName);
  
  String str = JsonUtils.serialize(metrics);
  byte[] message = str.getBytes(StandardCharsets.UTF_8);
  Future<RecordMetadata> future = producer.send(
      new ProducerRecord<String, byte[]>(topicName, message));
  if (syncMode) {
    producer.flush();
    try {
      future.get();
    } catch (InterruptedException | ExecutionException e) {
      throw new RuntimeException(e);
    }
  }
}

代码示例来源:origin: confluentinc/kafka-streams-examples

/**
 * @param topic          Kafka topic to write the data records to
 * @param records        Data records to write to Kafka
 * @param producerConfig Kafka producer configuration
 * @param <K>            Key type of the data records
 * @param <V>            Value type of the data records
 */
public static <K, V> void produceKeyValuesSynchronously(
  String topic, Collection<KeyValue<K, V>> records, Properties producerConfig)
  throws ExecutionException, InterruptedException {
 Producer<K, V> producer = new KafkaProducer<>(producerConfig);
 for (KeyValue<K, V> record : records) {
  Future<RecordMetadata> f = producer.send(
    new ProducerRecord<>(topic, record.key, record.value));
  f.get();
 }
 producer.flush();
 producer.close();
}

代码示例来源:origin: apache/kafka

@Test(expected = KafkaException.class)
public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
  Map<String, Object> configs = new HashMap<>();
  configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
  configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
  configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
  Time time = new MockTime();
  MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
  Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
  metadata.update(initialUpdateResponse, time.milliseconds());
  MockClient client = new MockClient(time, metadata);
  Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
      metadata, client, null, time);
  try {
    producer.initTransactions();
  } catch (TimeoutException e) {
    // expected
  }
  // other transactional operations should not be allowed if we catch the error after initTransactions failed
  try {
    producer.beginTransaction();
  } finally {
    producer.close(Duration.ofMillis(0));
  }
}

代码示例来源:origin: apache/incubator-druid

@Override
public void flush()
{
 producer.flush();
}

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka11

@Override
public Collection<PartitionInfo> call() throws Exception {
  Producer<byte[], byte[]> producer = producerFB.createProducer();
  List<PartitionInfo> partitionsFor = producer.partitionsFor(destination.getName());
  producer.close();
  ((DisposableBean) producerFB).destroy();
  return partitionsFor;
}

代码示例来源:origin: apache/nifi

public PublishResult complete() {
  if (tracker == null) {
    if (messagesSent.get() == 0L) {
      return PublishResult.EMPTY;
    }
    rollback();
    throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  }
  producer.flush();
  if (activeTransaction) {
    producer.commitTransaction();
    activeTransaction = false;
  }
  try {
    tracker.awaitCompletion(maxAckWaitMillis);
    return tracker.createPublishResult();
  } catch (final InterruptedException e) {
    logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    Thread.currentThread().interrupt();
    return tracker.failOutstanding(e);
  } catch (final TimeoutException e) {
    logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    return tracker.failOutstanding(e);
  } finally {
    tracker = null;
  }
}

代码示例来源:origin: apache/kylin

public void tryFetchMetadataFor(String topic) {
  producer.partitionsFor(topic);
}

代码示例来源:origin: reactor/reactor-kafka

/**
 * Tests invocation of methods on KafkaProducer using {@link KafkaSender#doOnProducer(java.util.function.Function)}
 */
@Test
public void producerMethods() {
  testProducerMethod(p -> assertEquals(0, p.metrics().size()));
  testProducerMethod(p -> assertEquals(2, p.partitionsFor(topic).size()));
  testProducerMethod(p -> p.flush());
}

代码示例来源:origin: openzipkin/brave

@Override public void initTransactions() {
 delegate.initTransactions();
}

代码示例来源:origin: spring-projects/spring-kafka

@Test
public void testDeadLetterPublisherWhileTransactionActive() {
  @SuppressWarnings("unchecked")
  Producer<Object, Object> producer1 = mock(Producer.class);
  @SuppressWarnings("unchecked")
  Producer<Object, Object> producer2 = mock(Producer.class);
  producer1.initTransactions();
  @SuppressWarnings("unchecked")
  ProducerFactory<Object, Object> pf = mock(ProducerFactory.class);
  given(pf.transactionCapable()).willReturn(true);
  given(pf.createProducer()).willReturn(producer1).willReturn(producer2);
  KafkaTemplate<Object, Object> template = spy(new KafkaTemplate<>(pf));
  template.setDefaultTopic(STRING_KEY_TOPIC);
  KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
  new TransactionTemplate(tm).execute(s -> {
    new DeadLetterPublishingRecoverer(template).accept(
        new ConsumerRecord<>(STRING_KEY_TOPIC, 0, 0L, "key", "foo"),
        new RuntimeException("foo"));
    return null;
  });
  verify(producer1).beginTransaction();
  verify(producer1).commitTransaction();
  verify(producer1).close();
  verify(producer2, never()).beginTransaction();
  verify(template, never()).executeInTransaction(any());
}

代码示例来源:origin: apache/kafka

producer.send(new ProducerRecord<>(topicName, "key", "value"));
    fail();
  } catch (Exception e) {
producer.close(Duration.ofMillis(0));
TestUtils.waitForCondition(() -> sendException.get() != null, "No producer exception within timeout");
assertEquals(KafkaException.class, sendException.get().getClass());

代码示例来源:origin: linkedin/cruise-control

final AtomicInteger metricSampleCount = new AtomicInteger(0);
for (PartitionMetricSample sample : samples.partitionMetricSamples()) {
 _producer.send(new ProducerRecord<>(_partitionMetricSampleStoreTopic, null, sample.sampleTime(), null, sample.toBytes()),
         new Callback() {
          @Override
 _producer.send(new ProducerRecord<>(_brokerMetricSampleStoreTopic, sample.toBytes()),
         new Callback() {
          @Override
_producer.flush();
if (LOG.isDebugEnabled()) {
 LOG.debug("Stored {} partition metric samples and {} broker metric samples to Kafka",

代码示例来源:origin: apache/incubator-gobblin

@Override
public void flush()
  throws IOException {
  this.producer.flush();
}

代码示例来源:origin: apache/nifi

public PublishResult complete() {
  if (tracker == null) {
    if (messagesSent.get() == 0L) {
      return PublishResult.EMPTY;
    }
    rollback();
    throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
  }
  producer.flush();
  if (activeTransaction) {
    producer.commitTransaction();
    activeTransaction = false;
  }
  try {
    tracker.awaitCompletion(maxAckWaitMillis);
    return tracker.createPublishResult();
  } catch (final InterruptedException e) {
    logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    Thread.currentThread().interrupt();
    return tracker.failOutstanding(e);
  } catch (final TimeoutException e) {
    logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
    return tracker.failOutstanding(e);
  } finally {
    tracker = null;
  }
}

相关文章