在流启动后触发流新作业

dfuffjeb  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(242)

我有一个情况,我试图流使用Spark流从Kafka。这条河是直达的。我可以创建一个流,然后开始流,也可以获得任何更新(如果有的话)Kafka通过流。
当我有一个新的请求来流式处理一个新的主题时,问题就出现了。因为sparkstreaming上下文只能是每个jvm的1,所以我不能为每个新请求创建一个新流。
我想的是
一旦创建了一个数据流并且spark流已经在进行中,只需将一个新的流附加到它。这似乎不起作用,createdstream(对于一个新的topic2)不返回流,进一步的处理被停止。在第一个请求(比如topic1)时,流继续进行。
第二,我想停止流,创建dstream,然后重新开始流。我不能使用相同的流上下文(它抛出一个例外,即流停止后不能添加作业),如果我为新主题(topic2)创建一个新的流,则旧的流主题(topic1)将丢失,并且只流新主题。
这是密码,看一下

JavaStreamingContext javaStreamingContext;
        if(null == javaStreamingContext) {
            javaStreamingContext =  JavaStreamingContext(sparkContext, Durations.seconds(duration));
        } else {
            StreamingContextState streamingContextState = javaStreamingContext.getState();
            if(streamingContextState == StreamingContextState.STOPPED) {
                javaStreamingContext =  JavaStreamingContext(sparkContext, Durations.seconds(duration));
            }

        }
Collection<String> topics = Arrays.asList(getTopicName(schemaName));
        SparkVoidFunctionImpl impl = new SparkVoidFunctionImpl(getSparkSession());

        KafkaUtils.createDirectStream(javaStreamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, getKafkaParamMap()))
                .map((stringStringConsumerRecord) -> stringStringConsumerRecord.value())
                .foreachRDD(impl);
if (javaStreamingContext.getState() == StreamingContextState.ACTIVE) {

            javaStreamingContext.start();
            javaStreamingContext.awaitTermination();
        }

不用担心sparkvoidfunctionimpl,这是一个自定义类,它是voidfunction的实现。
以上是方法1,我不停止现有的流。当一个新的请求进入这个方法时,它没有得到一个新的流对象,它试图创建一个数据流。问题是永远不会返回dstream对象。

KafkaUtils.createDirectStream(javaStreamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, getKafkaParamMap()))

这不会返回数据流,控件只是在没有错误的情况下终止。不会执行进一步的步骤。
我试过很多东西,读过多篇文章,但我相信这是一个非常常见的生产层面的问题。任何流式处理都是在多个不同的主题上进行的,每个主题的处理方式都不同。
请帮忙

qmelpv7a

qmelpv7a1#

问题是spark master会将代码发送给工作者,尽管数据是流式传输的,但除非重新启动作业,否则底层代码和变量值将保持静态。
我能想到的几个选择:
spark作业服务器:每当您想订阅/流式处理不同的主题而不是接触已经运行的作业时,请启动一个新作业。从api主体中,可以提供参数或主题名称。如果你想停止从一个特定的主题流,只要停止各自的工作。它将给你很大的灵活性和对资源的控制。
[理论]主题过滤器:订阅您认为需要的所有主题,当在一段时间内提取记录时,根据主题列表过滤出记录。通过api操纵这个主题列表来增加或减少主题的范围,它也可以是一个广播变量。这只是一个想法,我根本没有尝试过这个选择。
另一个解决方法是在需要时使用microservice将topic-2数据中继到topic-1,如果不需要就停止。

相关问题