加快从Telegram下载数据并将其保存到CSV文件的代码

shstlldc  于 7个月前  发布在  其他
关注(0)|答案(1)|浏览(59)

我实现了以下函数,我想尽可能加快速度:

async def async_update_database(self, source_id: str, source_hash: str, message_id_1: str, message_id_n: str, file: str):

        entity = types.InputPeerChannel(int(source_id), int(source_hash))

        n_messages = 0
        n_comments = 0

        with open(file, 'a', encoding = "utf-8") as fout:
            for message_id in range(int(message_id_1), int(message_id_n) + 1):
                message = await self.client.get_messages(entity, ids = message_id)
                if message == None or message.message == "" or message.replies == None or message.replies.replies == 0:
                    continue

                fout.write("M," + str(int(message.date.timestamp())) + ',' + str(message_id) + ',' + repr(message.message) + '\n')
                n_messages += 1

                async for comment in self.client.iter_messages(entity, reply_to = message_id):
                    if isinstance(comment.sender, types.User) and not comment.sender.bot:
                        fout.write("C," + str(int(comment.date.timestamp())) + ',' + str(comment.sender.id) + ',' + repr(comment.message) + '\n')
                        n_comments += 1

        return (str(n_messages) + " " + str(n_comments))

字符串
该函数从外部代码调用,从Telegram频道请求帖子和评论的数据,下载它们并将其以CSV格式保存到文件中。有很多调用,所以写入文件被设置为附加到结束模式。下载卷:对该函数的一次调用可以下载10'000个帖子,每个帖子中有1000条评论,即1000000行被写入文件。
你能用这段代码做什么?我不是PythonMaven,我可能会在这里犯一些愚蠢的错误。从我看来不是很理想的地方:1)在这种形式下,写到文件末尾是否有效?2)异步调用是否有任何问题?3)我是否正确地使用了Telethon库中的函数?

0tdrvxhp

0tdrvxhp1#

您的代码目前实际上没有使用async的任何优点(当只考虑此端点时)。通过在for循环中调用await,或稍后调用async for,您可以有效地同步处理这些async调用,并且每个调用将仅在其他调用完成后执行。虽然这仍然有助于允许其他FastAPI端点中的async调用,你在这里没有做任何async优化。
你可以通过使用asyncio.gather()来提高性能,并传递一个函数列表以“并行”异步执行(这不是真正的并行,但它是一个足够好的类比)。围绕await的代码的第一部分将变成这样:

import functools

message_ids = list(range(int(message_id_1), int(message_id_n) + 1))
# create a partial function with "entity" already passed
get_messages_func = functools.partial(self.client.get_messages, entity)
messages = await asyncio.gather(get_messages_func, message_ids)

字符串
消息应该以消息列表的形式结束。这里的区别在于,client.get_messages调用在开始之前不会等待前一个调用 return,而是在前一个调用 starts 时立即启动,并在收到响应时立即返回。将这种方法应用于第二个client.get_messages调用也可以提高性能。
我能想到的主要问题是你的程序内存不足,因为它不会“同时”处理所有的请求和它们的返回值,这意味着在解析新的响应之前没有时间垃圾收集旧的响应。你可以通过设置一次处理多少请求(gather)来解决这个问题。

相关问题