kafka.javaapi.producer.Producer.close()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(4.9k)|赞(0)|评价(0)|浏览(134)

本文整理了Java中kafka.javaapi.producer.Producer.close()方法的一些代码示例,展示了Producer.close()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Producer.close()方法的具体详情如下:
包路径:kafka.javaapi.producer.Producer
类名称:Producer
方法名:close

Producer.close介绍

暂无

代码示例

代码示例来源:origin: apache/incubator-pinot

public void shutdown() {
 keepIndexing = false;
 avroDataStream = null;
 producer.close();
 producer = null;
 service.shutdown();
}

代码示例来源:origin: org.apache.kafka/kafka_2.9.2

public void awaitShutdown() {
  try {
   shutdownComplete.await();
   producer.close();
   logger.info("Producer thread " + threadName + " shutdown complete");
  } catch(InterruptedException ie) {
   logger.warn("Interrupt during shutdown of ProducerThread", ie);
  }
 }
}

代码示例来源:origin: linkedin/camus

} finally {
 if (producer != null) {
  producer.close();

代码示例来源:origin: linkedin/camus

private static List<Message> writeKafka(String topic, int numOfMessages) {
 List<Message> messages = new ArrayList<Message>();
 List<KeyedMessage<String, String>> kafkaMessages = new ArrayList<KeyedMessage<String, String>>();
 for (int i = 0; i < numOfMessages; i++) {
  Message msg = new Message(RANDOM.nextInt());
  messages.add(msg);
  kafkaMessages.add(new KeyedMessage<String, String>(topic, Integer.toString(i), gson.toJson(msg)));
 }
 Properties producerProps = cluster.getProps();
 producerProps.setProperty("serializer.class", StringEncoder.class.getName());
 producerProps.setProperty("key.serializer.class", StringEncoder.class.getName());
 Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(producerProps));
 try {
  producer.send(kafkaMessages);
 } finally {
  producer.close();
 }
 return messages;
}

代码示例来源:origin: linkedin/camus

private Producer mockProducerSendThrowsException() {
 Producer mockProducer = EasyMock.createMock(Producer.class);
 mockProducer.send((KeyedMessage) EasyMock.anyObject());
 EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).anyTimes();
 mockProducer.close();
 EasyMock.expectLastCall().anyTimes();
 EasyMock.replay(mockProducer);
 return mockProducer;
}

代码示例来源:origin: HuaweiBigData/StreamCQL

/**
 * {@inheritDoc}
 */
@Override
public void destroy()
  throws StreamingException
{
  if (producer != null)
  {
    producer.close();
  }
}

代码示例来源:origin: gwenshap/kafka-examples

@Override
  public void close() {
    producer.close();
  }
}

代码示例来源:origin: apache/eagle

@Override
public void cleanup() {
  if (this.producer != null) {
    this.producer.close();
  }
}

代码示例来源:origin: com.github.hackerwin7/jlib-utils

/**
 * close producer client
 */
public void close() {
  if(producer != null)
    producer.close();
}

代码示例来源:origin: org.apache.apex/malhar-contrib

/**
 * Implement Component Interface.
 */
@Override
public void teardown()
{
 producer.close();
}

代码示例来源:origin: linkedin/camus

private Producer mockProducerThirdSendSucceed() {
 Producer mockProducer = EasyMock.createMock(Producer.class);
 mockProducer.send((KeyedMessage) EasyMock.anyObject());
 EasyMock.expectLastCall().andThrow(new RuntimeException("dummyException")).times(2);
 mockProducer.send((KeyedMessage) EasyMock.anyObject());
 EasyMock.expectLastCall().times(1);
 mockProducer.close();
 EasyMock.expectLastCall().anyTimes();
 EasyMock.replay(mockProducer);
 return mockProducer;
}

代码示例来源:origin: apache/eagle

@Override
  public void close() throws IOException {
    if (this.producer != null) {
      LOGGER.info("Closing kafka producer for stream {}", this.streamId);
      this.producer.close();
    }
  }
}

代码示例来源:origin: org.apache.apex/malhar-contrib

public void stopServer() throws IOException {
  serverSocket.close();
  connectionSocket.close();
  producer.close();
 }
}

代码示例来源:origin: apache/apex-malhar

public void stopServer() throws IOException
 {
  serverSocket.close();
  connectionSocket.close();
  producer.close();
 }
}

代码示例来源:origin: locationtech/geowave

public synchronized void close() {
  for (final Producer<String, T> producer : cachedProducers.values()) {
   try {
    producer.close();
   } catch (final Exception e) {
    LOGGER.warn("Unable to close kafka producer", e);
   }
  }
  cachedProducers.clear();
 }
}

代码示例来源:origin: com.ebay.jetstream/jetstream-messaging

private void closeProducers(List<Producer<byte[], byte[]>> producers) {
  Iterator<Producer<byte[], byte[]>> it = producers.iterator();
  while (it.hasNext()) {
    Producer<byte[], byte[]> producer = it.next();
    if (producer != null) {
      producer.close();
      producer = null;
    }
    it.remove();
  }
}

代码示例来源:origin: org.apache.twill/twill-core

@Override
 public void run() {
  // Call from cancel() through executor only.
  cancelChangeListener.cancel();
  Producer<Integer, ByteBuffer> kafkaProducer = producer.get();
  kafkaProducer.close();
  executor.shutdownNow();
 }
}

代码示例来源:origin: thilinamb/flume-ng-kafka-sink

@Override
public synchronized void stop() {
  producer.close();
  super.stop();
}

代码示例来源:origin: com.netflix.suro/suro-kafka

@Override
protected void innerClose() {
  super.innerClose();
  producer.close();
}

代码示例来源:origin: buildlackey/cep

public void shutdown() {
  producer.close();
  try {               // Give producer some time...
    Thread.sleep(1000);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  kafkaServer.shutdown();
  kafkaServer.awaitShutdown();
}

相关文章

微信公众号

最新文章

更多