Spark流-基于滤波器参数分割输入流的最佳方法

jjjwad0x  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(335)

我目前正在尝试创建某种监控解决方案—一些数据被写入kafka,我使用spark streaming读取这些数据并对其进行处理。
为了对机器学习和异常检测的数据进行预处理,我想根据一些过滤器参数对数据流进行分割。到目前为止,我已经了解到数据流本身不能分裂成几个流。
我主要面临的问题是,许多算法(如kmeans)只接收连续数据,而不接收离散数据,如url或其他字符串。
我的理想要求是:
从Kafka读取数据,并根据我读取的内容生成字符串列表
基于该字符串列表生成多个流-(拆分流、筛选流或任何最佳实践)
使用这些流为每个流训练不同的模型,以获得一个基线,然后将随后出现的所有内容与基线进行比较
我很乐意得到任何关于如何解决我的问题的建议。我无法想象spark中没有涵盖这个场景-但是直到现在我还没有找到一个可行的解决方案。

mzillmmw

mzillmmw1#

我认为使用过滤器和Map从原始数据创建派生数据流就足够了:

val numericFeaturesDStream = originalDStream.filter(e => predicate(e)).map(e => extractFeatures(e))
val otherNumericFeaturesDStream = originalDStream.filter(e => predicate2(e)).map(e => extractOtherFeatures(e))

注意这些 filter 以及 map 这些步骤可以结合在一起 collect 步骤(不要与无参数rdd.collect混淆,它将数据传送到驱动程序!!!)

val featuresStream = originalDStream.transform(rdd => 
  rdd.collect{case (a,b,c,d,e) if c=="client" => Vectors.parse(a)}
)
streamingKMeans.trainOn(featuresStream)

我们还可以将一组动态过滤的数据流保存到某个集合中。这里我们使用一个包含我们用来过滤的键的Map:

originalDStream.cache() // important for performance when a DStream is branched out.
// filterKeys: Set[String] 
val dstreamByFilterKey = filterKeys.map(key => key -> originalDStream.filter(e => (getKey(e)==key)))
// do something with the different DStreams in this structure ...

这些代码片段是用实际逻辑完成的代码示例。

相关问题