python多处理逐渐增加内存,直到它运行

k3fezbri  于 2021-07-14  发布在  Java
关注(0)|答案(0)|浏览(148)

我有一个包含多个模块的python程序。它们是这样的:
作业类,它是入口点,管理程序的总体流程
任务类,它是要在给定数据上运行的任务的基类。许多专门为不同数据列上的不同计算类型创建的子任务类都是从task类派生的。假设数据中有10列,每个列都有自己的任务来进行一些处理。例如,currencyconvertertask可以使用“price”列返回本地货币值等等。
许多其他模块,如获取数据的连接器、utils模块等,我认为与这个问题无关。
程序的一般流程是:从数据库中连续获取数据->处理数据->将更新后的数据写回数据库。
我决定在多处理中这样做,因为任务相对简单。它们大多做一些基本的算术或逻辑运算,在一个进程中运行需要很长时间,特别是从一个大数据库中获取数据,按顺序处理非常慢。
所以多处理(mp)代码看起来是这样的(我不能公开整个文件,所以我正在编写一个简化版本,没有包含的部分在这里不相关。我已经通过注解进行了测试,因此这是实际代码的准确表示):

class Job():
    def __init__():
        block_size = 100 # process 100 rows at a time
        some_query = "SELECT * IF A > B" # some query to filter data from db

    def data_getter():
        # continusouly get data from the db and put it into a queue in blocks
        cursor = Connector.get_data(some_query)
        block = []

        for item in cursor:
            block.append(item)
            if len(block) ==block_size:
                data_queue.put(data)
                block = []

        data_queue.put(None) # this will indicate the worker processors when to stop

    def monitor():
        # continuously monitor the system stats
        timer = Timer()
        while (True):
            if timer.time_taken >= 60: # log some stats every 60 seconds
                print(utils.system_stats())
                timer.reset()

    def task_runner():
        while True:
            # get data from the queue
            # if there's no data, break out of loop
            data = data_queue.get()
            if data is None:
                break

            # run task one by one
            for task in tasks:
                task.do_something(data)

    def run():
        # queue to put data for processing
        data_queue = mp.Queue()

        # start a process for reading data from db
        dg = mp.Process(target=self.data_getter).start()

        # start a process for monitoring system stats
        mon = mp.Process(target=self.monitor).start()

        # get a list of tasks to run
        tasks = [t for t in taskmodule.get_subtasks()]

        workers = []
        # start 4 processes to do the actual processing
        for _ in range(4):
            worker = mp.Process(target=task_runner)
            worker.start()
            workers.append(worker)

        for w in workers:
            w.join()

        mon.terminate() # terminate the monitor process
        dg.terminate() # end the data getting process

if __name__ == "__main__":
  job = Job()
  job.run()

整个程序的运行方式如下: python3 runjob.py 预期行为:连续的数据流进入 data_queue 每个辅助进程都会获取数据和进程,直到不再有来自游标的数据为止,此时辅助进程完成,整个程序完成。
这是预期的工作,但不期望的是,系统内存使用率不断攀升,直到系统崩溃。这个 data 我得到这里不是复制任何地方(至少有意)。我希望在整个程序中内存使用是稳定的。长度 data_queue 很少超过1或2,因为进程足够快,可以在可用时获取数据,所以队列不会容纳太多数据。
我猜这里启动的所有进程都是长时间运行的,这与此有关。虽然我可以打印pid,如果我按照pid top 命令数据获取程序和监视进程的内存使用率不要超过2%。4个工作进程也不会占用大量内存。整个过程的主要过程也是如此。有一个未说明的进程占用了ram的20%+。它让我很烦,我不知道它是什么。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题