org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties类的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(19.4k)|赞(0)|评价(0)|浏览(108)

本文整理了Java中org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties类的一些代码示例,展示了KafkaConsumerProperties类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KafkaConsumerProperties类的具体详情如下:
包路径:org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties
类名称:KafkaConsumerProperties

KafkaConsumerProperties介绍

[英]Extended consumer properties for Kafka binder.
[中]卡夫卡活页夹的扩展消费者属性。

代码示例

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka-core

private ConsumerDestination createDlqIfNeedBe(AdminClient adminClient, String name, String group,
                      ExtendedConsumerProperties<KafkaConsumerProperties> properties,
                      boolean anonymous, int partitions) {
  if (properties.getExtension().isEnableDlq() && !anonymous) {
    String dlqTopic = StringUtils.hasText(properties.getExtension().getDlqName()) ?
        properties.getExtension().getDlqName() : "error." + name + "." + group;
    try {
      createTopicAndPartitions(adminClient, dlqTopic, partitions,
          properties.getExtension().isAutoRebalanceEnabled(), properties.getExtension().getAdmin());
    }
    catch (Throwable throwable) {
      if (throwable instanceof Error) {
        throw (Error) throwable;
      }
      else {
        throw new ProvisioningException("provisioning exception", throwable);
      }
    }
    return new KafkaConsumerDestination(name, partitions, dlqTopic);
  }
  return null;
}

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka

private boolean isAutoCommitOnError(ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
  return properties.getExtension().getAutoCommitOnError() != null
      ? properties.getExtension().getAutoCommitOnError()
      : properties.getExtension().isAutoCommitOffset() && properties.getExtension().isEnableDlq();
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka

protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
    ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
  Map<String, Object> props = new HashMap<>();
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, anonymous ? "latest" : "earliest");
  props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
  Map<String, Object> mergedConfig = this.configurationProperties.mergedConsumerConfiguration();
  if (!ObjectUtils.isEmpty(mergedConfig)) {
    props.putAll(mergedConfig);
  }
  if (ObjectUtils.isEmpty(props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
  }
  if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getConfiguration())) {
    props.putAll(consumerProperties.getExtension().getConfiguration());
  }
  if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getStartOffset())) {
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
        consumerProperties.getExtension().getStartOffset().name());
  }
  return new DefaultKafkaConsumerFactory<>(props);
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka

final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !extendedConsumerProperties.getExtension().isEnableDlq(),
    "DLQ support is not available for anonymous subscriptions");
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
boolean usingPatterns = extendedConsumerProperties.getExtension().isDestinationIsPattern();
Assert.isTrue(!usingPatterns || !extendedConsumerProperties.isMultiplex(),
    "Cannot use a pattern with multiplexed destinations; "
    + "use the regex pattern to specify multiple topics instead");
boolean groupManagement = extendedConsumerProperties.getExtension().isAutoRebalanceEnabled();
if (!extendedConsumerProperties.isMultiplex()) {
  listenedPartitions.addAll(processTopic(consumerGroup, extendedConsumerProperties, consumerFactory,
    listenedPartitions);
final ContainerProperties containerProperties = anonymous
    || extendedConsumerProperties.getExtension().isAutoRebalanceEnabled()
        ? usingPatterns
            ? new ContainerProperties(Pattern.compile(topics[0]))
containerProperties.setIdleEventInterval(extendedConsumerProperties.getExtension().getIdleEventInterval());
int concurrency = usingPatterns ? extendedConsumerProperties.getConcurrency()
    : Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
  messageListenerContainer.getContainerProperties()
      .setAckMode(ContainerProperties.AckMode.MANUAL);
  messageListenerContainer.getContainerProperties()

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka

private ConsumerDestination createDlqIfNeedBe(AdminClient adminClient, String name, String group,
                      ExtendedConsumerProperties<KafkaConsumerProperties> properties,
                      boolean anonymous, int partitions) {
  if (properties.getExtension().isEnableDlq() && !anonymous) {
    String dlqTopic = StringUtils.hasText(properties.getExtension().getDlqName()) ?
        properties.getExtension().getDlqName() : "error." + name + "." + group;
    try {
      createTopicAndPartitions(adminClient, dlqTopic, partitions,
          properties.getExtension().isAutoRebalanceEnabled(), properties.getExtension().getTopic());
    }
    catch (Throwable throwable) {
      if (throwable instanceof Error) {
        throw (Error) throwable;
      }
      else {
        throw new ProvisioningException("provisioning exception", throwable);
      }
    }
    return new KafkaConsumerDestination(name, partitions, dlqTopic);
  }
  return null;
}

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka11-core

@Override
public ConsumerDestination provisionConsumerDestination(final String name, final String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
  KafkaTopicUtils.validateTopicName(name);
  boolean anonymous = !StringUtils.hasText(group);
  Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
      "DLQ support is not available for anonymous subscriptions");
  if (properties.getInstanceCount() == 0) {
    throw new IllegalArgumentException("Instance count cannot be zero");
  }
  int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
  createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, partitionCount, properties.getExtension().isAutoRebalanceEnabled());
  if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation != null) {
    final ZkUtils zkUtils = ZkUtils.apply(this.configurationProperties.getZkConnectionString(),
        this.configurationProperties.getZkSessionTimeout(),
        this.configurationProperties.getZkConnectionTimeout(),
        JaasUtils.isZkSecurityEnabled());
    int partitions = adminUtilsOperation.partitionSize(name, zkUtils);
    if (properties.getExtension().isEnableDlq() && !anonymous) {
      String dlqTopic = StringUtils.hasText(properties.getExtension().getDlqName()) ?
          properties.getExtension().getDlqName() : "error." + name + "." + group;
      createTopicAndPartitions(dlqTopic, partitions, properties.getExtension().isAutoRebalanceEnabled());
      return new KafkaConsumerDestination(name, partitions, dlqTopic);
    }
    return new KafkaConsumerDestination(name, partitions);
  }
  return new KafkaConsumerDestination(name);
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka

final ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
KafkaConsumerProperties kafkaConsumerProperties = properties.getExtension();
if (kafkaConsumerProperties.isEnableDlq()) {
  KafkaProducerProperties dlqProducerProperties = kafkaConsumerProperties.getDlqProducerProperties();
  ProducerFactory<?, ?> producerFactory = this.transactionManager != null
      ? this.transactionManager.getProducerFactory()
    String dlqName = StringUtils.hasText(kafkaConsumerProperties.getDlqName())
        ? kafkaConsumerProperties.getDlqName() : "error." + record.topic() + "." + group;
    dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName);
  };

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka11

Assert.isTrue(!anonymous || !extendedConsumerProperties.getExtension().isEnableDlq(),
    "DLQ support is not available for anonymous subscriptions");
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
    extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
    new Callable<Collection<PartitionInfo>>() {
if (extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ||
    extendedConsumerProperties.getInstanceCount() == 1) {
  listenedPartitions = allPartitions;
    listenedPartitions);
final ContainerProperties containerProperties = anonymous
    || extendedConsumerProperties.getExtension().isAutoRebalanceEnabled()
        ? new ContainerProperties(destination.getName())
        : new ContainerProperties(topicPartitionInitialOffsets);
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
  messageListenerContainer.getContainerProperties()
      .setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
String[] trustedPackages = extendedConsumerProperties.getExtension().getTrustedPackages();
if (!StringUtils.isEmpty(trustedPackages)) {
  headerMapper.addTrustedPackages(trustedPackages);

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka

private ConsumerDestination doProvisionConsumerDestination(final String name, final String group,
    ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
  if (properties.getExtension().isDestinationIsPattern()) {
    Assert.isTrue(!properties.getExtension().isEnableDlq(),
        "enableDLQ is not allowed when listening to topic patterns");
    if (this.logger.isDebugEnabled()) {
  Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
      "DLQ support is not available for anonymous subscriptions");
  if (properties.getInstanceCount() == 0) {
  ConsumerDestination consumerDestination = new KafkaConsumerDestination(name);
  try (AdminClient adminClient = createAdminClient()) {
    createTopic(adminClient, name, partitionCount, properties.getExtension().isAutoRebalanceEnabled(),
        properties.getExtension().getTopic());
    if (this.configurationProperties.isAutoCreateTopics()) {
      DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka-core

private ConsumerDestination doProvisionConsumerDestination(final String name, final String group,
    ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
  if (properties.getExtension().isDestinationIsPattern()) {
    Assert.isTrue(!properties.getExtension().isEnableDlq(),
        "enableDLQ is not allowed when listening to topic patterns");
    if (this.logger.isDebugEnabled()) {
  Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
      "DLQ support is not available for anonymous subscriptions");
  if (properties.getInstanceCount() == 0) {
  ConsumerDestination consumerDestination = new KafkaConsumerDestination(name);
  try (AdminClient adminClient = createAdminClient()) {
    createTopic(adminClient, name, partitionCount, properties.getExtension().isAutoRebalanceEnabled(),
        properties.getExtension().getAdmin());
    if (this.configurationProperties.isAutoCreateTopics()) {
      DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka

static void prepareConsumerBinding(String name, String group, ApplicationContext context,
                    KafkaTopicProvisioner kafkaTopicProvisioner,
                    KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
                    ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties,
                    Map<String, KafkaStreamsDlqDispatch> kafkaStreamsDlqDispatchers) {
  ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(
      properties.getExtension());
  if (binderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.sendToDlq) {
    extendedConsumerProperties.getExtension().setEnableDlq(true);
  }
  String[] inputTopics = StringUtils.commaDelimitedListToStringArray(name);
  for (String inputTopic : inputTopics) {
    kafkaTopicProvisioner.provisionConsumerDestination(inputTopic, group, extendedConsumerProperties);
  }
  if (extendedConsumerProperties.getExtension().isEnableDlq()) {
    KafkaStreamsDlqDispatch kafkaStreamsDlqDispatch = !StringUtils.isEmpty(extendedConsumerProperties.getExtension().getDlqName()) ?
        new KafkaStreamsDlqDispatch(extendedConsumerProperties.getExtension().getDlqName(), binderConfigurationProperties,
            extendedConsumerProperties.getExtension()) : null;
    for (String inputTopic : inputTopics) {
      if (StringUtils.isEmpty(extendedConsumerProperties.getExtension().getDlqName())) {
        String dlqName = "error." + inputTopic + "." + group;
        kafkaStreamsDlqDispatch = new KafkaStreamsDlqDispatch(dlqName, binderConfigurationProperties,
            extendedConsumerProperties.getExtension());
      }
      SendToDlqAndContinue sendToDlqAndContinue = context.getBean(SendToDlqAndContinue.class);
      sendToDlqAndContinue.addKStreamDlqDispatch(inputTopic, kafkaStreamsDlqDispatch);
      kafkaStreamsDlqDispatchers.put(inputTopic, kafkaStreamsDlqDispatch);
    }
  }
}

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka

Assert.isTrue(!anonymous || !consumerProperties.getExtension().isEnableDlq(),
    "DLQ support is not available for anonymous subscriptions");
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
source.setRawMessageHeader(consumerProperties.getExtension().isEnableDlq());
String clientId = name;
if (consumerProperties.getExtension().getConfiguration().containsKey(ConsumerConfig.CLIENT_ID_CONFIG)) {
  clientId = consumerProperties.getExtension().getConfiguration().get(ConsumerConfig.CLIENT_ID_CONFIG);

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka

@Override
protected MessageHandler getPolledConsumerErrorMessageHandler(ConsumerDestination destination, String group,
    ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
  if (properties.getExtension().isEnableDlq()) {
    return getErrorMessageHandler(destination, group, properties);

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka11

final byte[] payload = record.value() != null
    ? Utils.toArray(ByteBuffer.wrap((byte[]) record.value())) : null;
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName())
    ? extendedConsumerProperties.getExtension().getDlqName()
    : "error." + destination.getName() + "." + group;
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(dlqName, record.partition(),

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka

private Collection<PartitionInfo> getPartitionInfo(String topic,
    final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
    final ConsumerFactory<?, ?> consumerFactory, int partitionCount) {
  return provisioningProvider.getPartitionsForTopic(partitionCount,
      extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
      () -> {
        try (Consumer<?, ?> consumer = consumerFactory.createConsumer()) {
          List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic);
          return partitionsFor;
        }
      }, topic);
}

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka

private KafkaHeaderMapper getHeaderMapper(
    final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
  KafkaHeaderMapper mapper = null;
  if (this.configurationProperties.getHeaderMapperBeanName() != null) {
    mapper = getApplicationContext().getBean(this.configurationProperties.getHeaderMapperBeanName(),
        KafkaHeaderMapper.class);
  }
  if (mapper == null) {
    BinderHeaderMapper headerMapper = new BinderHeaderMapper() {
      @Override
      public void toHeaders(Headers source, Map<String, Object> headers) {
        super.toHeaders(source, headers);
        if (headers.size() > 0) {
          headers.put(BinderHeaders.NATIVE_HEADERS_PRESENT, Boolean.TRUE);
        }
      }
    };
    String[] trustedPackages = extendedConsumerProperties.getExtension().getTrustedPackages();
    if (!StringUtils.isEmpty(trustedPackages)) {
      headerMapper.addTrustedPackages(trustedPackages);
    }
    mapper = headerMapper;
  }
  return mapper;
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka

KafkaStreamsDlqDispatch(String dlqName,
            KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
            KafkaConsumerProperties kafkaConsumerProperties) {
  ProducerFactory<byte[], byte[]> producerFactory = getProducerFactory(
      new ExtendedProducerProperties<>(kafkaConsumerProperties.getDlqProducerProperties()),
      kafkaBinderConfigurationProperties);
  this.kafkaTemplate = new KafkaTemplate<>(producerFactory);
  this.dlqName = dlqName;
}

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka

final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !extendedConsumerProperties.getExtension().isEnableDlq(),
    "DLQ support is not available for anonymous subscriptions");
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
boolean usingPatterns = extendedConsumerProperties.getExtension().isDestinationIsPattern();
Assert.isTrue(!usingPatterns || !extendedConsumerProperties.isMultiplex(),
    "Cannot use a pattern with multiplexed destinations; "
    + "use the regex pattern to specify multiple topics instead");
boolean groupManagement = extendedConsumerProperties.getExtension().isAutoRebalanceEnabled();
if (!extendedConsumerProperties.isMultiplex()) {
  listenedPartitions.addAll(processTopic(consumerGroup, extendedConsumerProperties, consumerFactory,
    listenedPartitions);
final ContainerProperties containerProperties = anonymous
    || extendedConsumerProperties.getExtension().isAutoRebalanceEnabled()
        ? usingPatterns
            ? new ContainerProperties(Pattern.compile(topics[0]))
containerProperties.setIdleEventInterval(extendedConsumerProperties.getExtension().getIdleEventInterval());
int concurrency = usingPatterns ? extendedConsumerProperties.getConcurrency()
    : Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
  messageListenerContainer.getContainerProperties()
      .setAckMode(ContainerProperties.AckMode.MANUAL);
  messageListenerContainer.getContainerProperties()

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka

final ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
KafkaConsumerProperties kafkaConsumerProperties = properties.getExtension();
if (kafkaConsumerProperties.isEnableDlq()) {
  KafkaProducerProperties dlqProducerProperties = kafkaConsumerProperties.getDlqProducerProperties();
  ProducerFactory<?, ?> producerFactory = this.transactionManager != null
      ? this.transactionManager.getProducerFactory()
    String dlqName = StringUtils.hasText(kafkaConsumerProperties.getDlqName())
        ? kafkaConsumerProperties.getDlqName() : "error." + record.topic() + "." + group;
    dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName);
  };

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka

Assert.isTrue(!anonymous || !consumerProperties.getExtension().isEnableDlq(),
    "DLQ support is not available for anonymous subscriptions");
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
source.setRawMessageHeader(consumerProperties.getExtension().isEnableDlq());
String clientId = name;
if (consumerProperties.getExtension().getConfiguration().containsKey(ConsumerConfig.CLIENT_ID_CONFIG)) {
  clientId = consumerProperties.getExtension().getConfiguration().get(ConsumerConfig.CLIENT_ID_CONFIG);

相关文章