org.apache.kafka.common.utils.Utils.toPositive()方法的使用及代码示例

x33g5p2x  于2022-02-01 转载在 其他  
字(3.9k)|赞(0)|评价(0)|浏览(178)

本文整理了Java中org.apache.kafka.common.utils.Utils.toPositive()方法的一些代码示例,展示了Utils.toPositive()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Utils.toPositive()方法的具体详情如下:
包路径:org.apache.kafka.common.utils.Utils
类名称:Utils
方法名:toPositive

Utils.toPositive介绍

[英]A cheap way to deterministically convert a number to a positive value. When the input is positive, the original value is returned. When the input number is negative, the returned positive value is the original value bit AND against 0x7fffffff which is not its absolutely value. Note: changing this method in the future will possibly cause partition selection not to be compatible with the existing messages already placed on a partition since it is used in producer's org.apache.kafka.clients.producer.internals.DefaultPartitioner
[中]一种廉价的方法,可以确定地将一个数字转换为正值。当输入为正时,返回原始值。当输入数字为负数时,返回的正值是原始值位,而0x7fffffff不是其绝对值。注意:将来更改此方法可能会导致分区选择与已放置在分区上的现有消息不兼容,因为它在producer's org中使用。阿帕奇。卡夫卡。客户。制作人内部。默认分区器

代码示例

代码示例来源:origin: apache/kafka

/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
 */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  int numPartitions = partitions.size();
  if (keyBytes == null) {
    int nextValue = nextValue(topic);
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (availablePartitions.size() > 0) {
      int part = Utils.toPositive(nextValue) % availablePartitions.size();
      return availablePartitions.get(part).partition();
    } else {
      // no partitions are available, give a non-available partition
      return Utils.toPositive(nextValue) % numPartitions;
    }
  } else {
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  }
}

代码示例来源:origin: sixt/ja-micro

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  int numPartitions = partitions.size();
  if (keyBytes == null) {
    int nextValue = roundRobin.getAndIncrement();
    return Utils.toPositive(nextValue) % numPartitions;
  } else {
    // hash the keyBytes to choose a partition
    return Utils.toPositive(xxHasher.hash(keyBytes, 0, keyBytes.length, SEED)) % numPartitions;
  }
}

代码示例来源:origin: simplesteph/medium-blog-kafka-udemy

private boolean isValidReview(Review review) {
  try {
    int hash = Utils.toPositive(Utils.murmur2(review.toByteBuffer().array()));
    return  (hash % 100) >= 5; // 95 % of the reviews will be valid reviews
  } catch (IOException e) {
    return false;
  }
}

代码示例来源:origin: rayokota/kafka-graphs

private static <K> int vertexToPartition(K vertex, Serializer<K> serializer, int numPartitions) {
  // TODO make configurable, currently this is tied to DefaultStreamPartitioner
  byte[] keyBytes = serializer.serialize(null, vertex);
  int partition = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  return partition;
}

代码示例来源:origin: org.apache.kafka/kafka-streams

/**
   * WindowedStreamPartitioner determines the partition number for a record with the given windowed key and value
   * and the current number of partitions. The partition number id determined by the original key of the windowed key
   * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
   *
   * @param topic the topic name this record is sent to
   * @param windowedKey the key of the record
   * @param value the value of the record
   * @param numPartitions the total number of partitions
   * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
   */
  @Override
  public Integer partition(final String topic, final Windowed<K> windowedKey, final V value, final int numPartitions) {
    final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey);

    // hash the keyBytes to choose a partition
    return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  }
}

相关文章