python(py)spark streaming 3.0.1中的触发模式“连续”

b1zrtrql  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(230)

我使用pysparkv3.0.1(今天最新版本)来执行一些简单的流操作。我使用小批量,而不是批量流式处理。
通过查看pyspark流式处理检查器(web ui),我可以看到流程的“addbatch”步骤在小作业中产生了大部分开销,因此我决定采用“连续触发器”模式,以消除这种影响:
(图中浅黄色显示addbatch时间(>总时间的80%):https://ibb.co/7r6chgj)
注意:我的简单工作使用 forEach 流api的函数。
为了启用连续触发模式,我采取了3个步骤:
定义将负责处理foreach调用的类:

class ForeachWriter:

    def __init__(self, *args):
        print("initialized")

    def open(self, partition_id, epoch_id=0):
        return True

    def process(self, row):        
        pass

    def close(self, error):
        pass

writer = ForeachWriter()

创建结构化写流:

ssc, spark =  createContext(...)
lines = ssc.load()
ws=lines.writeStream

ws.foreach(writer)

打开连续触发器:

ws=ws.trigger(continuous="1 second")

现在,查询已正确生成,但由于此运行时异常,作业始终无法正常运行:
“无法从taskcontext获取批处理id”
此方法导致:https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/streaming.html#datastreamwriter.foreach
建议的解决方案
因此,我通过创建一个自定义类来终止foreach()方法,以避免触发上述异常:

from pyspark.sql.streaming import DataStreamWriter

class CustomDataStreamWriter(DataStreamWriter):

    def __init__(self,dsw):
        self.dsw = dsw

    # .... override all other methods

    def foreachBatch(self, func):
        return self.dsw.foreachBatch(func)

    def foreach(self, f):
        from pyspark.rdd import _wrap_function
        from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
        from pyspark.taskcontext import TaskContext

        if callable(f):
            # The provided object is a callable function that is supposed to be called on each row.
            # Construct a function that takes an iterator and calls the provided function on each
            # row.
            def func_without_process(_, iterator):
                for x in iterator:
                    f(x)
                return iter([])

            func = func_without_process

        else:
            # The provided object is not a callable function. Then it is expected to have a
            # 'process(row)' method, and optional 'open(partition_id, epoch_id)' and
            # 'close(error)' methods.

            if not hasattr(f, 'process'):
                raise Exception("Provided object does not have a 'process' method")

            if not callable(getattr(f, 'process')):
                raise Exception("Attribute 'process' in provided object is not callable")

            def doesMethodExist(method_name):
                exists = hasattr(f, method_name)
                if exists and not callable(getattr(f, method_name)):
                    raise Exception(
                        "Attribute '%s' in provided object is not callable" % method_name)
                return exists

            open_exists = doesMethodExist('open')
            close_exists = doesMethodExist('close')

            def func_with_open_process_close(partition_id, iterator):
                epoch_id = TaskContext.get().getLocalProperty('streaming.sql.batchId')
                if epoch_id:
                    epoch_id = int(epoch_id)
                else:
                    epoch_id = 0
                #    raise Exception("Could not get batch id from TaskContext")

                # Check if the data should be processed
                should_process = True
                if open_exists:
                    should_process = f.open(partition_id, epoch_id)

                error = None

                try:
                    if should_process:
                        for x in iterator:
                            f.process(x)
                except Exception as ex:
                    error = ex
                finally:
                    if close_exists:
                        f.close(error)
                    if error:
                        raise error

                return iter([])

            func = func_with_open_process_close

        serializer = AutoBatchedSerializer(PickleSerializer())
        dsw = self.dsw
        wrapped_func = _wrap_function(dsw._spark._sc, func, serializer, serializer)
        jForeachWriter = \
            dsw._spark._sc._jvm.org.apache.spark.sql.execution.python.PythonForeachWriter(
                wrapped_func, dsw._df._jdf.schema())
        dsw._jwrite.foreach(jForeachWriter)
        return dsw

然后我就在上面第2步之后,第3步之前使用了:

ws=CustomDataStreamWriter(ws)

我已经重演了剧本。这次在spark中正确地加载和初始化了脚本,我在pysparkwebui中看到了一个永无止境的作业(这应该是连续触发模式的预期结果)。
但是,foreach处理器的open/process/close回调永远不会被调用(触发),所以我想知道是否有其他人找到了一个很好的解决方法,或者我是否必须等待spark的下一个版本来解决这个问题?
您可以想象,这样的解决方法将极大地提高我的应用程序的性能,因此非常感谢您的帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题