
x33g5p2x  于2022-02-01 转载在 其他  



[英]Generates 32 bit murmur2 hash from byte array


代码示例来源:origin: apache/incubator-pinot

public int getPartition(Object valueIn) {
 String value = (valueIn instanceof String) ? (String) valueIn : valueIn.toString();
 return (Utils.murmur2(StringUtil.encodeUtf8(value)) & 0x7fffffff) % _numPartitions;

代码示例来源:origin: linkedin/kafka-monitor

public int partition(String key, int partitionNum) {
 byte[] keyBytes = key.getBytes();
 return toPositive(murmur2(keyBytes)) % partitionNum;

代码示例来源:origin: apache/kafka

 * Compute the partition for the given record.
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  int numPartitions = partitions.size();
  if (keyBytes == null) {
    int nextValue = nextValue(topic);
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (availablePartitions.size() > 0) {
      int part = Utils.toPositive(nextValue) % availablePartitions.size();
      return availablePartitions.get(part).partition();
    } else {
      // no partitions are available, give a non-available partition
      return Utils.toPositive(nextValue) % numPartitions;
  } else {
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

代码示例来源:origin: vakinge/jeesuite-libs

public DefaultMessage partitionFactor(Serializable partitionFactor) {
  if(partitionFactor != null){
    partitionHash = Utils.murmur2(partitionFactor.toString().getBytes());
  return this;

代码示例来源:origin: simplesteph/medium-blog-kafka-udemy

private boolean isValidReview(Review review) {
  try {
    int hash = Utils.toPositive(Utils.murmur2(review.toByteBuffer().array()));
    return  (hash % 100) >= 5; // 95 % of the reviews will be valid reviews
  } catch (IOException e) {
    return false;

代码示例来源:origin: rayokota/kafka-graphs

private static <K> int vertexToPartition(K vertex, Serializer<K> serializer, int numPartitions) {
  // TODO make configurable, currently this is tied to DefaultStreamPartitioner
  byte[] keyBytes = serializer.serialize(null, vertex);
  int partition = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  return partition;

代码示例来源:origin: Nepxion/Thunder

  private int getPartitionIndex(Consumer<String, byte[]> consumer, String topic, String key) {
    int partitionNumber = consumer.partitionsFor(topic).size();

    StringSerializer keySerializer = new StringSerializer();
    byte[] serializedKey = keySerializer.serialize(topic, key);

    int positive = Utils.murmur2(serializedKey) & 0x7fffffff;

    return positive % partitionNumber;

代码示例来源:origin: org.apache.kafka/kafka-streams

   * WindowedStreamPartitioner determines the partition number for a record with the given windowed key and value
   * and the current number of partitions. The partition number id determined by the original key of the windowed key
   * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
   * @param topic the topic name this record is sent to
   * @param windowedKey the key of the record
   * @param value the value of the record
   * @param numPartitions the total number of partitions
   * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
  public Integer partition(final String topic, final Windowed<K> windowedKey, final V value, final int numPartitions) {
    final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey);

    // hash the keyBytes to choose a partition
    return toPositive(Utils.murmur2(keyBytes)) % numPartitions;

代码示例来源:origin: vakinge/jeesuite-libs

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  List<PartitionInfo> partitions = cluster.availablePartitionsForTopic(topic);
  int numPartitions = partitions.size();
  try {			
    long partitionHash = ((DefaultMessage)value).partitionHash();
    if(partitionHash > 0){
      long index = partitionHash % numPartitions;
      //System.out.println("numPartitions:"+numPartitions+",partitionHash:"+partitionHash + ",index:"+index);
      return (int)index;
  } catch (ClassCastException e) {}
  if (keyBytes == null) {
    int nextValue = counter.getAndIncrement();
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (availablePartitions.size() > 0) {
      int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
      return availablePartitions.get(part).partition();
    } else {
      // no partitions are available, give a non-available partition
      return DefaultPartitioner.toPositive(nextValue) % numPartitions;
  } else {
    // hash the keyBytes to choose a partition
    return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

代码示例来源:origin: me.jeffshaw.kafka/kafka-clients

return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
