org.apache.kafka.clients.producer.Producer.close()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(5.9k)|赞(0)|评价(0)|浏览(296)

本文整理了Java中org.apache.kafka.clients.producer.Producer.close()方法的一些代码示例,展示了Producer.close()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.close()方法的具体详情如下:
包路径:org.apache.kafka.clients.producer.Producer
类名称:Producer
方法名:close

Producer.close介绍

[英]Close this producer
[中]关闭这个制作人

代码示例

代码示例来源:origin: apache/storm

@Override
public void cleanup() {
  producer.close();
}

代码示例来源:origin: OryxProject/oryx

@Override
public synchronized void close() {
 if (producer != null) {
  producer.close();
 }
}

代码示例来源:origin: line/armeria

@Override
  protected void close() {
    if (needToCloseProducer) {
      producer.close();
    }
  }
}

代码示例来源:origin: OryxProject/oryx

@Override
public synchronized void close() {
 if (producer != null) {
  producer.close();
 }
}

代码示例来源:origin: apache/nifi

/**
 * Will close the underlying {@link KafkaProducer}
 */
@Override
public void close() {
  this.kafkaProducer.close();
}

代码示例来源:origin: jmxtrans/jmxtrans

@Override
  public void close() {
    producer.close();
  }
}

代码示例来源:origin: jmxtrans/jmxtrans

@Override
  public void close() {
    producer.close();
  }
}

代码示例来源:origin: apache/kylin

public void close() {
    producer.close();
  }
}

代码示例来源:origin: apache/kafka

/**
   * See {@link KafkaProducer#close(Duration)}
   */
  void close(Duration timeout);
}

代码示例来源:origin: alibaba/canal

@Override
public void stop() {
  try {
    logger.info("## stop the kafka producer");
    if (producer != null) {
      producer.close();
    }
    if (producer2 != null) {
      producer2.close();
    }
  } catch (Throwable e) {
    logger.warn("##something goes wrong when stopping kafka producer:", e);
  } finally {
    logger.info("## kafka producer is down.");
  }
}

代码示例来源:origin: apache/incubator-gobblin

@Override
public void close()
  throws IOException {
 log.debug("Close called");
 this.producer.close();
}

代码示例来源:origin: apache/incubator-druid

@Override
 @LifecycleStop
 public void close()
 {
  scheduler.shutdownNow();
  producer.close();
 }
}

代码示例来源:origin: confluentinc/ksql

public void close() {
  commandConsumer.wakeup();
  commandConsumer.close();
  commandProducer.close();
 }
}

代码示例来源:origin: apache/kafka

@Test
public void closeShouldBeIdempotent() {
  Properties producerProps = new Properties();
  producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
  Producer producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
  producer.close();
  producer.close();
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldCloseAllResources() {
 // When:
 commandTopic.close();
 //Then:
 final InOrder ordered = inOrder(commandConsumer);
 ordered.verify(commandConsumer).wakeup();
 ordered.verify(commandConsumer).close();
 verify(commandProducer).close();
}

代码示例来源:origin: line/armeria

@Test
  public void closeProducerWhenRequested() {
    final KafkaAccessLogWriter<String, String> service =
        new KafkaAccessLogWriter<>(producer, TOPIC_NAME, log -> "");

    service.shutdown().join();
    verify(producer, times(1)).close();
  }
}

代码示例来源:origin: line/armeria

@Test
  public void testDoNotCloseProducerWhenNotRequested() {
    final KafkaStructuredLoggingServiceExposed service =
        new KafkaStructuredLoggingServiceExposed(producer, null, false);

    service.close();
    verify(producer, times(0)).close();
  }
}

代码示例来源:origin: line/armeria

@Test
public void testCloseProducerWhenRequested() {
  final KafkaStructuredLoggingServiceExposed service =
      new KafkaStructuredLoggingServiceExposed(producer, null, true);
  service.close();
  verify(producer, times(1)).close();
}

代码示例来源:origin: apache/kafka

@Test(expected = KafkaException.class)
public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
  Map<String, Object> configs = new HashMap<>();
  configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
  configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
  configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
  Time time = new MockTime();
  MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
  Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
  metadata.update(initialUpdateResponse, time.milliseconds());
  MockClient client = new MockClient(time, metadata);
  Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
      metadata, client, null, time);
  try {
    producer.initTransactions();
  } catch (TimeoutException e) {
    // expected
  }
  // other transactional operations should not be allowed if we catch the error after initTransactions failed
  try {
    producer.beginTransaction();
  } finally {
    producer.close(Duration.ofMillis(0));
  }
}

代码示例来源:origin: apache/kafka

@Test
public void testSendToInvalidTopic() throws Exception {
  Map<String, Object> configs = new HashMap<>();
  configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
  configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
  Time time = new MockTime();
  MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, emptyMap());
  Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
  metadata.update(initialUpdateResponse, time.milliseconds());
  MockClient client = new MockClient(time, metadata);
  Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
      metadata, client, null, time);
  String invalidTopicName = "topic abc"; // Invalid topic name due to space
  ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
  List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
  topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
      invalidTopicName, false, Collections.emptyList()));
  MetadataResponse updateResponse = new MetadataResponse(
      new ArrayList<>(initialUpdateResponse.brokers()),
      initialUpdateResponse.clusterId(),
      initialUpdateResponse.controller().id(),
      topicMetadata);
  client.prepareMetadataUpdate(updateResponse);
  Future<RecordMetadata> future = producer.send(record);
  assertEquals("Cluster has incorrect invalid topic list.", Collections.singleton(invalidTopicName),
      metadata.fetch().invalidTopics());
  TestUtils.assertFutureError(future, InvalidTopicException.class);
  producer.close(Duration.ofMillis(0));
}

相关文章