spark结构化流媒体处理每一行

cbwuti44  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(483)

我在spark 2.1.1中使用结构化流媒体。我需要对传入的消息应用一些业务逻辑(来自kafka源代码)。
本质上,我需要提取消息,获取一些键值,在hbase中查找它们,并对数据集执行更多的业务逻辑。最终结果是一个字符串消息,需要将其写入另一个kafka队列。
然而,由于传入消息的抽象是一个Dataframe(无界表结构流),因此我必须遍历在触发器执行期间接收到的数据集 mapPartitions (由于hbase客户端不可序列化而导致的分区)。
在我的流程中,我需要遍历每一行,以便为同一行执行业务流程。
有没有更好的方法可以帮助我避免 dataFrame.mapPartitions 打电话?我感觉到它的连续性和迭代性!!
结构化流基本上迫使我在业务流程之外生成一个输出Dataframe,而没有一个可以开始。我还可以使用什么样的设计模式来实现我的最终目标?
你能推荐另一种方法吗?

bfrts1fy

bfrts1fy1#

当您在spark中谈到使用Dataframe时,从广义上讲,您可以做以下三件事之一:a)生成Dataframeb)转换Dataframec)使用Dataframe
在结构化流中,使用数据源生成流Dataframe。通常使用公开的sparksession.readstream方法创建源。此方法返回一个datastreamreader,其中有多个方法用于读取各种输入。所有这些都返回一个Dataframe。它在内部创建一个数据源。spark允许您实现自己的数据源,但他们建议不要这样做,因为从2.2开始,该接口被认为是实验性的
您主要使用map或reduce或sparksql来转换Dataframe。有不同风格的map(map、mappartition、mapparitionwithindex)等,它们基本上都取一行并返回一行。spark在内部完成对map方法调用的并行化工作。它对数据进行分区,将数据分布在集群上的执行器上,并在执行器中调用map方法。你不需要担心并行性。它建在引擎盖下面。Map分区不是“连续的”。是的,一个分区内的行是按顺序执行的,但多个分区是并行执行的。通过对Dataframe进行分区,可以很容易地控制并行度。您有5个分区,您将有5个进程并行运行。你有200个,如果你有200个内核,你可以让其中的200个并行运行
请注意,没有什么可以阻止您转到管理转换内部状态的外部系统。但是,变换应该是幂等的。给定一组输入,它们应该始终生成相同的输出,并随着时间的推移使系统处于相同的状态。如果您正在与转换中的外部系统交谈,这可能会很困难。结构化流至少提供一次保证。这意味着同一行可能会被多次转换。所以,如果你正在做一些像给银行账户加钱的事情,你可能会发现你已经给一些账户加了两次同样的钱。
数据被接收器消耗。通常,通过调用dataframe上的format方法,然后调用start来添加接收器。structuredstreaming有一些内置接收器(除了一个外)或多或少都是无用的。您可以创建自定义接收器,但也不建议这样做,因为接口是实验性的。唯一有用的接收器是您将实现的。它被称为foreachsink。spark将为分区中的所有行调用for each sink。您可以对这些行执行任何操作,包括将其写入hbase。请注意,由于结构化流至少有一次的性质,同一行可能会多次馈送到foreachsink。您需要以幂等的方式实现它。此外,如果您有多个接收器,数据将并行写入接收器。您无法控制接收器的调用顺序。一个接收器正在从一个微批次获取数据,而另一个接收器仍在处理前一个微批次的数据。从本质上说,汇最终是一致的,而不是立即一致的。
一般来说,构建代码的最干净的方法是避免进入转换内部的外部系统。您的转换应该纯粹地转换Dataframe中的数据。如果要从hbase获取数据,请将其放入Dataframe中,将其与流Dataframe连接,然后对其进行转换。这是因为当你进入外部系统时,很难扩展。您希望通过增加Dataframe上的分区和添加节点来扩展转换。但是,与外部系统通信的节点太多会增加外部系统的负载并导致瓶颈,将转换与数据检索分离可以让您独立地扩展它们。
但是!!!!这里有大但是。。。。。。
1) 当您谈到结构化流时,无法实现一个源,该源可以根据输入中的数据有选择地从hbase获取数据。必须在map(-like)方法中执行此操作。所以,在我看来,如果hbase中的数据发生了变化,或者有很多数据不想保存在内存中,那么您所拥有的就非常好了。如果hbase中的数据很小且不变,那么最好将其读入批处理Dataframe,缓存它,然后将其与流Dataframe连接起来。spark将把所有数据加载到自己的内存/磁盘存储器中,并保存在那里。如果您的数据很小并且变化非常频繁,最好在Dataframe中读取它,不要缓存它并将其与流Dataframe连接。spark将在每次运行微批处理时从hbase加载数据。
2) 无法命令执行两个单独的接收器。因此,如果您的需求要求您写入数据库,并写入kafka,并且您希望保证在数据库中提交kafka中的行之后写入该行,那么唯一的方法是a)为每个接收器在a中进行两次写入。b) 在一个类似于map的函数中写入一个系统,在for each sink中写入另一个系统
不幸的是,如果您有一个要求,需要从流源读取数据,将其与批处理源中的数据连接起来,进行转换,将其写入数据库,调用api,从api获取结果,并将api的结果写入kafka,这些操作必须按准确的顺序进行,那么唯一的方法就是在转换组件中实现sink逻辑。必须确保在单独的Map函数中保持逻辑的独立性,以便以最佳方式并行化它们。
此外,没有好的方法可以知道应用程序何时完全处理了一个微批处理,特别是在您有多个接收器的情况下

相关问题