flink流不与工作线程拆分作业

plupiseo  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(324)

我的目标是建立一个以kafka为源,flink为流处理引擎的高吞吐量集群。以下是我所做的。
我在主机和从机上设置了一个2节点集群,配置如下。
flink-conf.yaml大师

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 256

taskmanager.heap.mb: 512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100

从属flink-conf.yaml

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 512 #256

taskmanager.heap.mb: 1024 #512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100

主节点上的从属文件如下所示:

<SLAVE_IP_ADDR>
localhost

两个节点上的flink设置都位于同名的文件夹中。我通过运行

bin/start-cluster-streaming.sh

这将启动从属节点上的任务管理器。
我的输入源是Kafka。以下是片段。

final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> stream = 
    env.addSource(
    new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);

env.execute("Kafka stream");

这是我的Flume功能

public class MySink implements SinkFunction<String> {

    private static final long serialVersionUID = 1L;

    public void invoke(String arg0) throws Exception {
        processMessage(arg0);
        System.out.println("Processed Message");
    }
}

下面是pom.xml中的flink依赖项。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-core</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>0.9.0</version>
</dependency>

然后我在主服务器上用这个命令运行打包的jar

bin/flink run flink-test-jar-with-dependencies.jar

但是,当我在kafka主题中插入消息时,我能够解释来自kafka主题的所有消息(通过调用的invoke方法中的debug消息) SinkFunction 实现)仅在主节点上。
在作业管理器ui中,我可以看到2个任务管理器,如下所示:

Jmeter 板看起来也是这样:

问题:
为什么从节点没有得到任务?
我缺少一些配置吗?

xzabzqsa

xzabzqsa1#

在flink中读取kafka源时,源任务的最大并行度受给定kafka主题的分区数限制。Kafka分区是flink中源任务可以使用的最小单元。如果分区多于源任务,则某些任务将使用多个分区。
因此,为了向所有100个任务提供输入,您应该确保您的kafka主题至少有100个分区。
如果不能更改主题的分区数,那么也可以使用 setParallelism 方法。或者,您可以使用 rebalance 方法,该方法将在前面操作的所有可用任务中无序排列数据。

相关问题