在flink中广播hashmap

mznpcxlj  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(561)

我正在使用 Flink v.1.4.0 .
我在和 DataSet api和我想尝试的一件事与广播变量在 Apache Spark .
实际上,我想在一个 DataSet ,遍历 DataSet 在一个 HashMap ; 如果Map中存在搜索元素,则检索相应的值。
这个 HashMap 它非常大,我不知道是否需要(因为我还没有构建我的解决方案) Serializable 供全体职工同时传输和使用。
一般来说,我想到的解决方案是这样的:

Map<String, T> hashMap = new ... ;

DataSet<Point> points = env.readCsv(...);

points
  .map(point -> hashMap.getOrDefault(point.getId, 0))
  ...

但我不知道这是否有效,也不知道它是否有效。在做了一点搜索之后,我在这里找到了一个更好的例子,根据这个例子我们可以 Broadcast 中的变量 Flink 广播 List 具体如下:

DataSet<Point> points = env.readCsv(...);

DataSet<Centroid> centroids = ... ; // some computation

points.map(new RichMapFunction<Point, Integer>() {

    private List<Centroid> centroids;

    @Override
    public void open(Configuration parameters) {
        this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
    }

    @Override
    public Integer map(Point p) {
        return selectCentroid(centroids, p);
    }

}).withBroadcastSet("centroids", centroids);

然而, .getBroadcastVariable() 似乎只适用于 List .
有人能提供一个替代的解决方案吗 HashMap ?
这个解决方案是如何工作的?
解决这个问题最有效的方法是什么?
是否可以使用flink管理的状态来执行类似于广播变量使用方式的操作?怎样?
最后,我可以尝试多次吗 mappings 在管道中有多个广播变量?

yr9zkbsy

yr9zkbsy1#

价值观在哪里 hashMap 从哪里来?其他两种可能的解决方案:
重新初始化/重新创建/重新生成 hashMap 在open方法中,分别在筛选/Map操作符的每个示例中。可能每个记录的效率更高,但重复了初始化逻辑。
创建两个 DataSet ,一个 hashMap 值,秒 points 加入这两个队伍 DataSet 正在使用所需的连接策略。作为类比,您试图做的事情可以用sql查询来表示 SELECT * FROM points p, hashMap h WHERE h.key = p.id .

相关问题