Pything线程,如何让3个线程读取一个csv文件

ycggw6v2  于 7个月前  发布在  其他
关注(0)|答案(2)|浏览(69)

我怎么能让3个线程读取一个csv文件,但有一个问题我不能弄清楚,例如在这个列表中:

1
2
3
4
5
6

字符串
我怎么能让线程读它像这样:

thread 1: 1
thread 2: 2
thread 3: 3
thread 1: 4
thread 2: 5
thread 3: 6


我希望他们继续前进,现在发生的事情是这样的:

thread 1: 1
thread 1: 2
thread 1: 3
thread 1: 4
thread 1: 5
thread 1: 6

thread 2: 1
thread 2: 2
thread 2: 3
thread 2: 4
thread 2: 5
thread 2: 6
etc...


我想让他们一起一排一排地读,你们能给予我搭把手吗?
这是我的代码,我试图弄清楚如何做上面的解释,你们有什么想法吗?

import threading
import csv

# Function to read and print rows from the CSV file
def read_csv(filename):
    with open(filename, 'r') as csv_file:
        reader = csv.reader(csv_file)
        for row in reader:
            print(f"Thread {threading.current_thread().name} - CSV Row: {row}")

# Specify the CSV file
csv_filename = 'your_csv_file.csv'

# Create and start three threads
threads = []
for i in range(3):
    thread = threading.Thread(target=read_csv, args=(csv_filename,))
    threads.append(thread)
    thread.start()

# Wait for all threads to finish
for thread in threads:
    thread.join()

r7xajy2e

r7xajy2e1#

这实际上取决于一些条件:
1.如果文件中的每一行都有一个已知的字符数(你提供的例子就是这样),你可以使用一些基本的算术来计算出应该从文件中读取哪些字节(但是如果你的行是[8,9,10,11],这就不起作用了,因为有些行有不同的字符数)。
1.如果文件相对较小(<1GB),并且您希望保持对它的阅读,则可以执行一次传递来计算每行的字节偏移量,然后将这些偏移量传递给线程以执行读取
1.如果你的文件比较大(>10GB),你可以将文件拆分成N个文件。(N是线程的数量)字节块,并将每个块传递给线程。然而,这几乎可以保证将文件拆分到一行中间的某个地方,因此每个线程需要开始寻找换行符,将未处理的字节放入队列,正常处理其余的输入,如果最后还有一些输入,它们应该再次传递到队列中,以便稍后处理。
这可能有助于更好地解释我的意思:您提供的示例每行有3个字符(假设您使用windows):

1\r\n
2\r\n
3\r\n
4\r\n
5\r\n
6\r\n
7\r\n

字符串
其中\r\n是回车符和换行符,它们是不可见的,但这些是用来确定换行符是什么的字符。所以我们总共有7*3=21个字节要处理。如果我们想使用2个线程来处理这个,其中一个线程得到11个字节,另一个线程得到10个字节,这意味着这些将是线程的输入:

Thread 1: 1\r\n2\r\n3\r\n4\r
Thread 2: \n5\r\n6\r\n7\r\n


线程1开始查找换行符并传递1\r\n然后,它处理2\r\n3\r\n,因为它可以确定这些是完整的行,并传递其余的行。(4\r)到队列。线程2做类似的事情:将\n传递到队列,处理5\r\n6\r\n7\r\n,然后你需要另一个工作线程来连接这些未处理的字节以形成完整的行,然后处理这些行。
如果您给予更详细的描述,说明您要实现的目标以及这里适用的情况,我可以用更完整的代码示例来更新答案。
我想我的回答被误解了,所以这里有一个更新的版本,实际上在线程之间平均分配输入:

import time
import math
from threading import Thread, Lock
from os.path import getsize

filename = "test.txt"
number_of_lines = 100
number_of_threads = 5
# This is only required for thread safe printing;
# In a real worker thread you will never do this since both locking and printing decrease preformance
lock = Lock()

with open(filename , "w") as f:
    for num in range(number_of_lines):
        f.write(f"{num}\n")
        
file_size = getsize(filename) # On windows this will be number_of_lines*3 and on linux it will be *2
ceil = int(math.ceil(file_size/number_of_threads))
offsets = [0]
for i in range(number_of_threads-1):
    offsets.append((i+1)*ceil)
offsets.append(file_size)
print(f"Wrote {file_size} bytes to {filename}")
print(f"Byte offsets passed to threads: {offsets}")
print("#"*20)

#Buffer size is how many bytes will be read in each iteration,
# in reality this should multiple KBs, but I use a small number for demonstration
def worker(filename, offset, count, thread_number, lock, buffer_size=10):
    with open(filename , "rb") as f:
        f.seek(offset)
        # Read the input in buffer_size chunks
        for _ in range(count//buffer_size):
            buffer = f.read(buffer_size)
            # go through buffer and find the first new line
            # do some processing
            with lock:
                print(f"worker {thread_number} read: {buffer}")
        buffer = f.read(count%buffer_size)
        # check if you have unprocessable data in your buffer and pass it somewhere else
        with lock:
            print(f"worker {thread_number} read: {buffer}")
        
threads = [Thread(target=worker, args=(filename, offsets[i], offsets[i+1]-offsets[i], i, lock)) for i in range(number_of_threads)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()


我很清楚这在实践中是非常低效的,因为:
1.使用Python很慢,如果你想要性能,你需要编写自己的C++扩展
1.文件大小和缓冲区大小是如此之小,线程增加了太多的开销,这意味着一次读取文件要快得多。
这并不是要提供一个完整的例子,而是要说明这是可以做到的,而且你并不像其他人建议的那样绑定到一个“IO线程”。显然,在每个线程中阅读整个文件是没有意义的,但是你可以很容易地使用seek()read()函数来读取给定偏移量和给定字节数的文件。
我不得不在一个项目上使用非常类似的技巧,该项目只需要使用CSV文件而不是DB来运行CSV上的查询,并且文件大小只有几GB。像这样分割输入可以非常快速地在数据集上创建索引,然后用于运行实际的查询。

t2a7ltrp

t2a7ltrp2#

您根本不会这样做-该文件是一个单一的资源,您必须传递一个令牌,该令牌阻止除一个线程之外的所有线程进行阅读。
相反,你会有一个“IO线程”,它负责阅读文件,并将读取的行放入队列的头部-如果你真的想像你的问题中所示的那样“循环”行,每个“工作线程”都有一个自己的队列,它从其中读取。如果队列中没有任何东西,该阅读将被阻塞,直到IO线程将一行插入队列。

相关问题