org.springframework.cloud.stream.binder.ExtendedConsumerProperties.getInstanceCount()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(92)

本文整理了Java中org.springframework.cloud.stream.binder.ExtendedConsumerProperties.getInstanceCount()方法的一些代码示例,展示了ExtendedConsumerProperties.getInstanceCount()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ExtendedConsumerProperties.getInstanceCount()方法的具体详情如下:
包路径:org.springframework.cloud.stream.binder.ExtendedConsumerProperties
类名称:ExtendedConsumerProperties
方法名:getInstanceCount

ExtendedConsumerProperties.getInstanceCount介绍

暂无

代码示例

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka

public Collection<PartitionInfo> processTopic(final String group,
    final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
    final ConsumerFactory<?, ?> consumerFactory, int partitionCount, boolean usingPatterns,
    boolean groupManagement, String topic) {
  Collection<PartitionInfo> listenedPartitions;
  Collection<PartitionInfo> allPartitions = usingPatterns ? Collections.emptyList()
      : getPartitionInfo(topic, extendedConsumerProperties, consumerFactory, partitionCount);
  if (groupManagement ||
      extendedConsumerProperties.getInstanceCount() == 1) {
    listenedPartitions = allPartitions;
  }
  else {
    listenedPartitions = new ArrayList<>();
    for (PartitionInfo partition : allPartitions) {
      // divide partitions across modules
      if ((partition.partition()
          % extendedConsumerProperties.getInstanceCount()) == extendedConsumerProperties
              .getInstanceIndex()) {
        listenedPartitions.add(partition);
      }
    }
  }
  this.topicsInUse.put(topic, new TopicInformation(group, listenedPartitions, usingPatterns));
  return listenedPartitions;
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka

public Collection<PartitionInfo> processTopic(final String group,
    final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
    final ConsumerFactory<?, ?> consumerFactory, int partitionCount, boolean usingPatterns,
    boolean groupManagement, String topic) {
  Collection<PartitionInfo> listenedPartitions;
  Collection<PartitionInfo> allPartitions = usingPatterns ? Collections.emptyList()
      : getPartitionInfo(topic, extendedConsumerProperties, consumerFactory, partitionCount);
  if (groupManagement ||
      extendedConsumerProperties.getInstanceCount() == 1) {
    listenedPartitions = allPartitions;
  }
  else {
    listenedPartitions = new ArrayList<>();
    for (PartitionInfo partition : allPartitions) {
      // divide partitions across modules
      if ((partition.partition()
          % extendedConsumerProperties.getInstanceCount()) == extendedConsumerProperties
              .getInstanceIndex()) {
        listenedPartitions.add(partition);
      }
    }
  }
  this.topicsInUse.put(topic, new TopicInformation(group, listenedPartitions, usingPatterns));
  return listenedPartitions;
}

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kinesis

@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
    ExtendedConsumerProperties<KinesisConsumerProperties> properties) throws ProvisioningException {
  if (logger.isInfoEnabled()) {
    logger.info("Using Kinesis stream for inbound: " + name);
  }
  if (properties.getHeaderMode() == null) {
    properties.setHeaderMode(HeaderMode.embeddedHeaders);
  }
  int shardCount = properties.getInstanceCount() * properties.getConcurrency();
  return new KinesisConsumerDestination(name, createOrUpdate(name, shardCount));
}

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kinesis-core

@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
    ExtendedConsumerProperties<KinesisConsumerProperties> properties) throws ProvisioningException {
  if (logger.isInfoEnabled()) {
    logger.info("Using Kinesis stream for inbound: " + name);
  }
  if (properties.getHeaderMode() == null) {
    properties.setHeaderMode(HeaderMode.embeddedHeaders);
  }
  int shardCount = properties.getInstanceCount() * properties.getConcurrency();
  return new KinesisConsumerDestination(name, createOrUpdate(name, shardCount));
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-aws-kinesis

@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
    ExtendedConsumerProperties<KinesisConsumerProperties> properties)
    throws ProvisioningException {
  if (logger.isInfoEnabled()) {
    logger.info("Using Kinesis stream for inbound: " + name);
  }
  if (properties.getHeaderMode() == null) {
    properties.setHeaderMode(HeaderMode.embeddedHeaders);
  }
  int shardCount = properties.getInstanceCount() * properties.getConcurrency();
  return new KinesisConsumerDestination(name, createOrUpdate(name, shardCount));
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka

Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
    "DLQ support is not available for anonymous subscriptions");
if (properties.getInstanceCount() == 0) {
  throw new IllegalArgumentException("Instance count cannot be zero");
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
ConsumerDestination consumerDestination = new KafkaConsumerDestination(name);
try (AdminClient adminClient = createAdminClient()) {

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka-core

Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
    "DLQ support is not available for anonymous subscriptions");
if (properties.getInstanceCount() == 0) {
  throw new IllegalArgumentException("Instance count cannot be zero");
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
ConsumerDestination consumerDestination = new KafkaConsumerDestination(name);
try (AdminClient adminClient = createAdminClient()) {

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka11-core

@Override
public ConsumerDestination provisionConsumerDestination(final String name, final String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
  KafkaTopicUtils.validateTopicName(name);
  boolean anonymous = !StringUtils.hasText(group);
  Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
      "DLQ support is not available for anonymous subscriptions");
  if (properties.getInstanceCount() == 0) {
    throw new IllegalArgumentException("Instance count cannot be zero");
  }
  int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
  createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, partitionCount, properties.getExtension().isAutoRebalanceEnabled());
  if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation != null) {
    final ZkUtils zkUtils = ZkUtils.apply(this.configurationProperties.getZkConnectionString(),
        this.configurationProperties.getZkSessionTimeout(),
        this.configurationProperties.getZkConnectionTimeout(),
        JaasUtils.isZkSecurityEnabled());
    int partitions = adminUtilsOperation.partitionSize(name, zkUtils);
    if (properties.getExtension().isEnableDlq() && !anonymous) {
      String dlqTopic = StringUtils.hasText(properties.getExtension().getDlqName()) ?
          properties.getExtension().getDlqName() : "error." + name + "." + group;
      createTopicAndPartitions(dlqTopic, partitions, properties.getExtension().isAutoRebalanceEnabled());
      return new KafkaConsumerDestination(name, partitions, dlqTopic);
    }
    return new KafkaConsumerDestination(name, partitions);
  }
  return new KafkaConsumerDestination(name);
}

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kinesis

if (properties.getInstanceCount() > 1) {
  shardOffsets = new HashSet<>();
  KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
  for (int i = 0; i < shards.size(); i++) {
    if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
      KinesisShardOffset shardOffset = new KinesisShardOffset(kinesisShardOffset);
      shardOffset.setStream(destination.getName());

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka11

final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(anonymous, consumerGroup,
    extendedConsumerProperties);
int partitionCount = extendedConsumerProperties.getInstanceCount()
    extendedConsumerProperties.getInstanceCount() == 1) {
  listenedPartitions = allPartitions;
        % extendedConsumerProperties.getInstanceCount()) == extendedConsumerProperties
            .getInstanceIndex()) {
      listenedPartitions.add(partition);

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-aws-kinesis

if (properties.getInstanceCount() > 1) {
  shardOffsets = new HashSet<>();
  KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
  for (int i = 0; i < shards.size(); i++) {
    if ((i % properties.getInstanceCount()) == properties
        .getInstanceIndex()) {
      KinesisShardOffset shardOffset = new KinesisShardOffset(

代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka

final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(anonymous, consumerGroup,
    extendedConsumerProperties);
int partitionCount = extendedConsumerProperties.getInstanceCount()

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka

final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(anonymous, consumerGroup,
    extendedConsumerProperties);
int partitionCount = extendedConsumerProperties.getInstanceCount()

相关文章