【FLink】flink keyby 分布不均匀问题

x33g5p2x  于2022-04-13 转载在 Flink  
字(8.8k)|赞(0)|评价(0)|浏览(1031)

1.概述

转载并且补充: flink keyby 分布不均匀问题

我使用随机数random.nextint(8)作为key,生成keyedstream之后,直接sink到存储中,但是sink算子只有四个slot在使用,另外四个是空闲状态,数据倾斜严重,请问是什么原因呢

第一想法,难道是随机数不均匀,于是我测试一下,结果发现随机数挺均匀的

/**
     * 测试点:测试随机值真的随机吗
     *
     * {0=12531, 1=12448, 2=12538, 3=12466, 4=12530, 5=12563, 6=12462, 7=12462}
     *
     * 可以看到还是很随机的
     */
    public void randomIsRandom(){
        Map<Integer,Long> map = new HashMap<>();
        Random random = new Random() ;
        for (int i = 0; i < 100000; i++) {
            int value = random.nextInt(8);
            Long aLong = map.get(value);
            if(aLong == null){
                aLong = 1l;
            }else {
                aLong++;
            }
            map.put(value,aLong);
        }
        System.out.println(map);
    }

2.案例2

后来在博客:flink keyby 分布不均匀问题 看到一样的错误。

flink 从 kafka 读取数据, 经过 keyby, 然后使用 .timeWindow(Time.seconds(1)) 每秒处理一次数据, 为了根据并行度, 均匀的分布 到 flink 同一个窗口算子的不同的 subtask 上, key 使用了随机整数

dataStream.map(
        new MapFunction<String, Tuple2<Integer, String>>() {
            @Override
            public Tuple2<Integer, String> map(String value) throws Exception {
                int key = RandomUtils.nextInt(parallelism);
                return new Tuple2<>(key, value);
            }
        })
        .keyBy((KeySelector<Tuple2<Integer, String>, Integer>) tuple2 -> tuple2.f0)
        .timeWindow(Time.seconds(1));

2.1 问题

当并行度为 2 时, 发现 数据都被发到1个 subtask, 另一个 subtask 没有数据.
并行度 > 2 时, 也会出现个别 subtask 数据很多, 这种数据倾斜问题.

2.3 原因

这个跟 flink 的 Key Group 有关系,
key group 参考:

Flink中Key Groups与最大并行度

Flink状态的缩放(rescale)与键组(Key Group)设计

Flink在使用key进行分组的时候,会对key的hashcode()再进行一次murmurhash算法,目的是为了在实际情况中尽量打散数据,减少碰撞。但是对于我们这种使用数字手工生成的key来说,计算后得到的 subtask id 相同,所以导致了部分subtask分配不到数据

计算工公式:

// maxParallelism 默认 128,  parallelism 为自定义的并行度
subtaskIndex = (MathUtils.murmurHash(key.hashCode()) % maxParallelism) * parallelism / maxParallelism;

具体代码在 KeyGroupRangeAssignment 类中

/**
 * Assigns the given key to a parallel operator index.
 *
 * @param key the key to assign
 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
 * @param parallelism the current parallelism of the operator
 * @return the index of the parallel operator to which the given key should be routed.
 */
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
	Preconditions.checkNotNull(key, "Assigned key must not be null!");
	return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}

/**
 * Assigns the given key to a key-group index.
 *
 * @param key the key to assign
 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
 * @return the key-group to which the given key is assigned
 */
public static int assignToKeyGroup(Object key, int maxParallelism) {
	Preconditions.checkNotNull(key, "Assigned key must not be null!");
	return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}

/**
 * Assigns the given key to a key-group index.
 *
 * @param keyHash the hash of the key to assign
 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
 * @return the key-group to which the given key is assigned
 */
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
	return MathUtils.murmurHash(keyHash) % maxParallelism;
}

/**
 * Computes the index of the operator to which a key-group belongs under the given parallelism and maximum
 * parallelism.
 *
 * <p>IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
 * to go beyond this boundary, this method must perform arithmetic on long values.
 *
 * @param maxParallelism Maximal parallelism that the job was initially created with.
 *                       0 < parallelism <= maxParallelism <= Short.MAX_VALUE must hold.
 * @param parallelism    The current parallelism under which the job runs. Must be <= maxParallelism.
 * @param keyGroupId     Id of a key-group. 0 <= keyGroupID < maxParallelism.
 * @return The index of the operator to which elements from the given key-group should be routed under the given
 * parallelism and maxParallelism.
 */
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
	return keyGroupId * parallelism / maxParallelism;
}

2.4 解决方案

对于这种手动指定 key 分布不均匀问题, 需要手动穷举几个值, 来代替 随机的数值, 使 subtaskIndex 分布更均匀

生成所需的 key

public static void main(String[] args) {

    HashMap<Integer, LinkedHashSet<Integer>> result = new HashMap<>();
    int parallelism = 2;// 指定并行度
    int maxParallelism = 128;// 默认值
    int maxRandomKey = parallelism * 10;
    for (int randomKey = 0; randomKey < maxRandomKey; randomKey++) {
        int subtaskIndex = (MathUtils.murmurHash(Integer.valueOf(randomKey).hashCode()) % maxParallelism) * parallelism / maxParallelism;
        LinkedHashSet<Integer> randomKeys = result.computeIfAbsent(subtaskIndex, k -> new LinkedHashSet<>());
        randomKeys.add(randomKey);
    }
    result.forEach((k, v) -> {
        System.out.println("subtaskIndex: " + k + ", randomKeys: " + v);

    });

}
subtaskIndex: 0, randomKeys: [4, 6, 8, 9, 12, 13, 14]
subtaskIndex: 1, randomKeys: [0, 1, 2, 3, 5, 7, 10, 11, 15, 16, 17, 18, 19]

分别不同的 subtaskIndex 中选取一个值, 例如: subtaskIndex_0 选 4, subtaskIndex_1 选 5,

然后改造原来的 keyby 中使用的 key

Integer[] rebalanceKeys = new Integer[]{4, 5};

dataStream.map(
        new MapFunction<String, Tuple2<Integer, String>>() {
            @Override
            public Tuple2<Integer, String> map(String value) throws Exception {
                // rebalanceKeys 的索引位置代表对应的 subtaskIndex
                int key = rebalanceKeys[RandomUtils.nextInt(parallelism)];
                return new Tuple2<>(key, value);
            }
        })
        .keyBy((KeySelector<Tuple2<Integer, String>, Integer>) tuple2 -> tuple2.f0)
        .timeWindow(Time.seconds(1));

进一步优化可以将生成 rebalanceKeys 数组的方法改造成工具类, 相信你可以的.


如果您没有时间尝试, 也可以参考下面的例子, createRebalanceKeys(int parallelism) 方法就是抽离出来的工具方法

下面是一个完整的例子

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class Test {

    /**
     * 任务描述:
     *    把输入的单词, 按照指定的 parallelism 数量分组, 每组数据没 5 秒钟一个窗口
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        int parallelism = 5;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
                .setParallelism(parallelism)
                .enableCheckpointing(TimeUnit.MINUTES.toMillis(1), CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        DataStream<List<String>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter(parallelism))
                .returns(new TypeHint<Tuple2<Integer, String>>() {
                })
                // 使用 rebalanceKeys 中的值作为分组 key
                .keyBy((KeySelector<Tuple2<Integer, String>, Integer>) tuple -> tuple.f0)
                // 构建窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .apply(new WindowFunction<Tuple2<Integer, String>, List<String>, Integer, TimeWindow>() {
                    @Override
                    public void apply(Integer key, TimeWindow window,
                                      Iterable<Tuple2<Integer, String>> input, Collector<List<String>> out) throws Exception {
                        List<String> lists = new ArrayList<>();
                        // value.f0 是之前的 key,  value.f1 是 word
                        input.forEach(value -> lists.add(value.f1));
                        out.collect(lists);
                    }
                })
                //   todo  处理每一个窗口的数据, 例如: 批量入 hbase
                //  .addSink(...)
                ;

        env.execute("job name");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<Integer, String>> {

        private int parallelism;
        private Integer[] rebalanceKeys;

        public Splitter(int parallelism) {
            this.parallelism = parallelism;
            // 用于负载均衡的 KEY 数组
            this.rebalanceKeys = createRebalanceKeys(parallelism);
        }

        @Override
        public void flatMap(String sentence, Collector<Tuple2<Integer, String>> out) throws Exception {
            for (String word : sentence.split(" ")) {
                // 对输入的 word 进行 hash,
                // 使用"并行度" 对 hash值 取模, 得到 rebalanceKeys 中的值, 用于后面的 key 分组
                // 如果直接使用 word 的 hash 值,作为后面的 keyBY 分组依据的话, 很大的可能导致分组不够均匀
                int rebalanceKeyIndex = Math.abs(word.hashCode() % parallelism);
                Integer key = rebalanceKeys[rebalanceKeyIndex];
                out.collect(new Tuple2<Integer, String>(key, word));
            }
        }
    }

    /**
     * 构建均衡 KEY 数组
     *
     * @param parallelism 并行度
     * @return
     */
    public static Integer[] createRebalanceKeys(int parallelism) {
        HashMap<Integer, LinkedHashSet<Integer>> groupRanges = new HashMap<>();
        int maxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism);
        // 构造多个 key 用于生成足够的 groupRanges
        int maxRandomKey = parallelism * 10;
        for (int randomKey = 0; randomKey < maxRandomKey; randomKey++) {
            int subtaskIndex = KeyGroupRangeAssignment.assignKeyToParallelOperator(randomKey, maxParallelism, parallelism);
            LinkedHashSet<Integer> randomKeys = groupRanges.computeIfAbsent(subtaskIndex, k -> new LinkedHashSet<>());
            randomKeys.add(randomKey);
        }

        Integer[] result = new Integer[parallelism];
        for (int i = 0; i < parallelism; i++) {
            LinkedHashSet<Integer> ranges = groupRanges.get(i);
            if (ranges == null || ranges.isEmpty()) {
                throw new RuntimeException("create rebalance keys error");
            }
            result[i] = ranges.stream().findFirst().get();
        }
        return result;
    }
}

相关文章