Apache Spark 使用Python多处理进程和队列中止函数

axkjgtzd  于 6个月前  发布在  Apache
关注(0)|答案(1)|浏览(65)

我有一个名为some_function(a,b)的函数,它接受了两个参数a和B,并做了一些密集的事情,然后将一个Spark框架存储为输出。我想在some_function ->上设置超时,如果它花费的时间超过5分钟(300秒),我想中止,并移动到主for循环中的下一个'i'。下面的方法似乎不起作用,some_function(a,b)似乎永远运行,而不是在5分钟后停止。可能队列有问题?

from multiprocessing import Process, Queue

def some_function(a,b):
//do some intensive process with a and b to generate spark dataframe

    //store Spark dataframe output in Q
    Q.put(some_spark_df)
    

def main(): 
    for i in range(0,10): 

            Q = Queue() 

            a = //something
            b = //something

            p = Process(target=some_function, args = (a, b))

            p.start()

            #timeout value in seconds (300s = 5min)
            p.join(300)

            #if still running after above time
            if p.is_alive():
                #terminate
                p.terminate()
                p.join()
            
            //get output from some_function(a,b)
            result_df = Q.get()
hfsqlsce

hfsqlsce1#

主进程无法成功join一个子进程,该子进程在将数据从队列中取出之前将数据放入multiprocessing.Queue示例。主进程将无限期阻塞,因为子进程在队列清空之前无法终止。
如果我们可以假设子进程在将结果放入队列时基本上已经完成了它的处理,因此已经终止或即将终止,那么主进程可以通过执行带有 timeout 值的get来检查结果是否已经放入队列。如果它能够检索数据,我们知道我们可以安全地join子进程。否则,我们对子进程调用terminate,然后我们可以安全地join子进程,而不管在对Queue.getProcess.terminate()的不成功调用之间是否有数据被放入队列(也就是说,不应该有竞态条件)。
或者,您可以 * 使用 * 托管 * 队列示例,它不会遇到与multiprocessing.Queue示例相同的问题,因为它试图join一个子进程,该子进程已将数据放置在队列上,但由于队列在另一个进程(调用multiprocessing.Manager()创建的进程)中退出,因此尚未检索到该数据。但性能可能会受到影响。

from multiprocessing import Process, Queue
from queue import Empty

def some_function(queue, a, b):
    # do some intensive process with a and b to generate spark dataframe
    import time

    if a == 1:
        time.sleep(5) # to force a timeout

    # store Spark dataframe output in queue
    queue.put(a + b) # for demo purposes

def main():
    for i in range(0, 3): # three times for demo purposes

        queue = Queue()

        a = i
        b = 2

        p = Process(target=some_function, args = (queue, a, b))
        p.start()
        # timeout value in seconds (3 seconds for demo purposes)
        try:
            result_df = queue.get(timeout=3)
        except Empty:
            # child process has not completed:
            p.terminate()
            p.join()
            print('task', i, 'timed out')
        else:
            p.join() # for demo purposes
            print('task', i, 'result is', result_df)

if __name__ == '__main__':
    main()

图纸:

task 0 result is 2
task 1 timed out
task 2 result is 4

相关问题