org.apache.kafka.common.utils.Utils.murmur2()方法的使用及代码示例

x33g5p2x  于2022-02-01 转载在 其他  
字(5.0k)|赞(0)|评价(0)|浏览(198)

本文整理了Java中org.apache.kafka.common.utils.Utils.murmur2()方法的一些代码示例,展示了Utils.murmur2()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Utils.murmur2()方法的具体详情如下:
包路径:org.apache.kafka.common.utils.Utils
类名称:Utils
方法名:murmur2

Utils.murmur2介绍

[英]Generates 32 bit murmur2 hash from byte array
[中]从字节数组生成32位2哈希

代码示例

代码示例来源:origin: apache/incubator-pinot

@Override
public int getPartition(Object valueIn) {
 String value = (valueIn instanceof String) ? (String) valueIn : valueIn.toString();
 return (Utils.murmur2(StringUtil.encodeUtf8(value)) & 0x7fffffff) % _numPartitions;
}

代码示例来源:origin: linkedin/kafka-monitor

public int partition(String key, int partitionNum) {
 byte[] keyBytes = key.getBytes();
 return toPositive(murmur2(keyBytes)) % partitionNum;
}

代码示例来源:origin: apache/kafka

/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
 */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  int numPartitions = partitions.size();
  if (keyBytes == null) {
    int nextValue = nextValue(topic);
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (availablePartitions.size() > 0) {
      int part = Utils.toPositive(nextValue) % availablePartitions.size();
      return availablePartitions.get(part).partition();
    } else {
      // no partitions are available, give a non-available partition
      return Utils.toPositive(nextValue) % numPartitions;
    }
  } else {
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  }
}

代码示例来源:origin: vakinge/jeesuite-libs

public DefaultMessage partitionFactor(Serializable partitionFactor) {
  if(partitionFactor != null){
    partitionHash = Utils.murmur2(partitionFactor.toString().getBytes());
  }
  return this;
}

代码示例来源:origin: simplesteph/medium-blog-kafka-udemy

private boolean isValidReview(Review review) {
  try {
    int hash = Utils.toPositive(Utils.murmur2(review.toByteBuffer().array()));
    return  (hash % 100) >= 5; // 95 % of the reviews will be valid reviews
  } catch (IOException e) {
    return false;
  }
}

代码示例来源:origin: rayokota/kafka-graphs

private static <K> int vertexToPartition(K vertex, Serializer<K> serializer, int numPartitions) {
  // TODO make configurable, currently this is tied to DefaultStreamPartitioner
  byte[] keyBytes = serializer.serialize(null, vertex);
  int partition = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  return partition;
}

代码示例来源:origin: Nepxion/Thunder

@SuppressWarnings("resource")
  private int getPartitionIndex(Consumer<String, byte[]> consumer, String topic, String key) {
    int partitionNumber = consumer.partitionsFor(topic).size();

    StringSerializer keySerializer = new StringSerializer();
    byte[] serializedKey = keySerializer.serialize(topic, key);

    int positive = Utils.murmur2(serializedKey) & 0x7fffffff;

    return positive % partitionNumber;
  }
}

代码示例来源:origin: org.apache.kafka/kafka-streams

/**
   * WindowedStreamPartitioner determines the partition number for a record with the given windowed key and value
   * and the current number of partitions. The partition number id determined by the original key of the windowed key
   * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
   *
   * @param topic the topic name this record is sent to
   * @param windowedKey the key of the record
   * @param value the value of the record
   * @param numPartitions the total number of partitions
   * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
   */
  @Override
  public Integer partition(final String topic, final Windowed<K> windowedKey, final V value, final int numPartitions) {
    final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey);

    // hash the keyBytes to choose a partition
    return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  }
}

代码示例来源:origin: vakinge/jeesuite-libs

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  List<PartitionInfo> partitions = cluster.availablePartitionsForTopic(topic);
  int numPartitions = partitions.size();
  try {			
    long partitionHash = ((DefaultMessage)value).partitionHash();
    //按hash分区
    if(partitionHash > 0){
      long index = partitionHash % numPartitions;
      //System.out.println("numPartitions:"+numPartitions+",partitionHash:"+partitionHash + ",index:"+index);
      return (int)index;
    }
  } catch (ClassCastException e) {}
  
  if (keyBytes == null) {
    int nextValue = counter.getAndIncrement();
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (availablePartitions.size() > 0) {
      int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
      return availablePartitions.get(part).partition();
    } else {
      // no partitions are available, give a non-available partition
      return DefaultPartitioner.toPositive(nextValue) % numPartitions;
    }
  } else {
    // hash the keyBytes to choose a partition
    return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  }
}

代码示例来源:origin: me.jeffshaw.kafka/kafka-clients

return Utils.abs(Utils.murmur2(record.key())) % numPartitions;

相关文章