合并100,000个CSV文件并在每次迭代中添加缺失信息

drnojrws  于 5个月前  发布在  其他
关注(0)|答案(1)|浏览(75)

我正在写一段代码,它把大约100,000个文本文件放在一起,并添加丢失的数据。每次迭代调用pandas df都很慢,有更推荐的方法吗?

for root, dirs, files in os.walk(resultsDir):
    for file in files:
        if file.endswith('.txt'):
            resultsFile = os.path.relpath(os.path.join(root, file))
            df = pd.read_csv(resultsFile, delimiter='\t')
            # parsePath data
            df['path'] = resultsFile
            df['resultsFile'] = file
            pathLvls = resultsFile.split(os.sep)
            df['sdCard'] = pathLvls[-3][:5]
            df['site'] = pathLvls[-3][6:]
            df['capture'] = pathLvls[-4]
            if not os.path.isfile(resultsFile):
                print(f' resultsFile {resultsFile} is missing')
            if df.empty:
                breakpoint()
                print(f'{resultsFile} empty results file')
                df['error'] = "empty results file"
            else:
                df2 = pd.concat([df2, df], ignore_index=True)

字符串

s8vozzvw

s8vozzvw1#

解决方案是使用threadingasyncio进行基于i/o的操作。
下面是一个例子:

import pandas as pd
import threading
import queue
import time

file_list = ['file1.csv', 'file2.csv', ...] # List of n file paths

def load_file(file, queue):
    df = pd.read_csv(file)
    queue.put(df)

q = queue.Queue() 

threads = []
for f in file_list:
    t = threading.Thread(target=load_file, args=(f, q)) 
    t.start()
    threads.append(t)

df = pd.DataFrame()
for t in threads:
    t.join()
    df = df.append(q.get())
    
print(df)

字符串
基本上,就像这样:

  • 使用队列在线程之间共享加载的DataFrame
  • 每个文件一个线程(我从来没有尝试过100,000线程)
  • 加入所有线程以等待完成

您需要使用os.walk()来填充列表等。

相关问题