如何在python中将逐块数据库记录写入文件

5ssjco0h  于 2021-06-26  发布在  Impala
关注(0)|答案(1)|浏览(206)

我正在提取 database records 从一张table上,我想 write 把它们变成一个 file . 我的情况是,我的表中有数十亿行。所以我不能一次加载所有的记录,因为我的内存不足。所以我一次只写一条记录。我现在的代码是:

from impala.dbapi import connect

connection = connect()
cur = connection.cursor()
sql = "<SQL Statement>"
cur.execute(sql)
write_file = open("file1.csv",'w')
write_file.write("<column header>")

row = cur.fetchone()
while row is not None:
      write_file.write("\n" + str(row)[1:-1])
      row = cur.fetchone()

write_file.close()

这样做会花费很多时间,因为一次处理一条记录的顺序太多。我想尝试的是一次写一个记录块(比如 1 million 一次记录)。我怎样才能写一本书 block of record 而不是一次一张唱片?
注意:我使用的数据库是 impala update:this is 执行上述代码的分析结果

ncalls  tottime  percall  cumtime  percall filename:lineno(function)

   148032    0.162    0.000    0.563    0.000 TBinaryProtocol.py:109(writeI32)
    49344    0.077    0.000    0.261    0.000 TBinaryProtocol.py:121(writeString)
    49344    0.256    0.000  146.854    0.003 TBinaryProtocol.py:125(readMessageBegin)
    49344    0.011    0.000    0.011    0.000 TBinaryProtocol.py:145(readMessageEnd)
        1    0.000    0.000    0.000    0.000 TBinaryProtocol.py:20(<module>)
   148032    0.257    0.000  146.350    0.001 TBinaryProtocol.py:205(readI32)
    49344    0.084    0.000    0.549    0.000 TBinaryProtocol.py:220(readString)
        1    0.000    0.000    0.000    0.000 TBinaryProtocol.py:226(TBinaryProtocolFactory)
        1    0.000    0.000    0.000    0.000 TBinaryProtocol.py:236(TBinaryProtocolAccelerated)
        1    0.000    0.000    0.000    0.000 TBinaryProtocol.py:24(TBinaryProtocol)
        1    0.000    0.000    0.000    0.000 TBinaryProtocol.py:258(TBinaryProtocolAcceleratedFactory)
        1    0.000    0.000    0.000    0.000 TBinaryProtocol.py:39(__init__)
    49344    0.134    0.000    0.828    0.000 TBinaryProtocol.py:44(writeMessageBegin)
    49344    0.010    0.000    0.010    0.000 TBinaryProtocol.py:54(writeMessageEnd)
        1    0.000    0.000    0.000    0.000 TCLIService.py:2364(GetOperationStatus_args)
      155    0.000    0.000    0.000    0.000 TCLIService.py:2375(__init__)
      155    0.001    0.000    0.002    0.000 TCLIService.py:2398(write)
        1    0.000    0.000    0.000    0.000 TCLIService.py:2425(GetOperationStatus_result)
      155    0.000    0.000    0.000    0.000 TCLIService.py:2435(__init__)
      155    0.001    0.000    0.002    0.000 TCLIService.py:2438(read)
        1    0.000    0.000    0.000    0.000 TCLIService.py:2485(CancelOperation_args)
        1    0.000    0.000    0.000    0.000 TCLIService.py:2546(CancelOperation_result)
        1    0.000    0.000    2.112    2.112 TCLIService.py:259(ExecuteStatement)
        1    0.000    0.000    2.112    2.112 TCLIService.py:275(recv_ExecuteStatement)
        1    0.000    0.000    0.000    0.000 TCLIService.py:2761(write)
        1    0.000    0.000    0.000    0.000 TCLIService.py:2788(GetResultSetMetadata_result)
        1    0.000    0.000    0.000    0.000 TCLIService.py:2798(__init__)
        1    0.000    0.000    0.000    0.000 TCLIService.py:2801(read)
        1    0.000    0.000    0.000    0.000 TCLIService.py:2848(FetchResults_args)
    49186    0.030    0.000    0.030    0.000 TCLIService.py:2859(__init__)
    49186    0.171    0.000    0.418    0.000 TCLIService.py:2882(write)
        1    0.000    0.000    0.000    0.000 TCLIService.py:2909(FetchResults_result)
    49186    0.047    0.000    0.047    0.000 TCLIService.py:2919(__init__)
    49186    0.195    0.000   93.752    0.002 TCLIService.py:2922(read)
      155    0.001    0.000    0.114    0.001 TCLIService.py:499(GetOperationStatus)
      155    0.001    0.000    0.012    0.000 TCLIService.py:507(send_GetOperationStatus)
      155    0.001    0.000    0.101    0.001 TCLIService.py:515(recv_GetOperationStatus)
        1    0.000    0.000    0.002    0.002 TCLIService.py:589(GetResultSetMetadata)
        1    0.000    0.000    0.000    0.000 TCLIService.py:597(send_GetResultSetMetadata)
        1    0.000    0.000    0.002    0.002 TCLIService.py:605(recv_GetResultSetMetadata)
    49186    0.145    0.000  241.626    0.005 TCLIService.py:619(FetchResults)
    49186    0.276    0.000    2.709    0.000 TCLIService.py:627(send_FetchResults)
    49186    0.318    0.000  238.771    0.005 TCLIService.py:635(recv_FetchResults)
        1    0.000    0.000    0.000    0.000 TCLIService.py:770(Processor)
        1    0.001    0.001    0.001    0.001 TCLIService.py:9(<module>)
        1    0.000    0.000    0.000    0.000 TProtocol.py:20(<module>)
        1    0.000    0.000    0.000    0.000 TProtocol.py:23(TProtocolException)
        1    0.000    0.000    0.000    0.000 TProtocol.py:39(TProtocolBase)
        1    0.000    0.000    0.000    0.000 TProtocol.py:419(TProtocolFactory)
        1    0.000    0.000    0.000    0.000 TProtocol.py:42(__init__)
  2245378    4.023    0.000  190.070    0.000 TSocket.py:103(read)
    49344    0.153    0.000    0.925    0.000 TSocket.py:123(write)
    49344    0.012    0.000    0.012    0.000 TSocket.py:137(flush)
        1    0.000    0.000    0.000    0.000 TSocket.py:141(TServerSocket)
        1    0.000    0.000    0.001    0.001 TSocket.py:20(<module>)
        1    0.000    0.000    0.000    0.000 TSocket.py:28(TSocketBase)
        1    0.000    0.000    0.000    0.000 TSocket.py:29(_resolveAddr)
        1    0.000    0.000    0.000    0.000 TSocket.py:47(TSocket)
        1    0.000    0.000    0.000    0.000 TSocket.py:50(__init__)
    49344    0.022    0.000    0.022    0.000 TSocket.py:69(isOpen)
        1    0.000    0.000    0.000    0.000 TSocket.py:72(setTimeout)
        1    0.000    0.000    0.000    0.000 TSocket.py:81(open)
        1    0.000    0.000    0.000    0.000 TTransport.py:103(TServerTransportBase)
        1    0.000    0.000    0.000    0.000 TTransport.py:116(TTransportFactoryBase)
        1    0.000    0.000    0.000    0.000 TTransport.py:123(TBufferedTransportFactory)
        1    0.000    0.000    0.000    0.000 TTransport.py:131(TBufferedTransport)
        1    0.000    0.000    0.000    0.000 TTransport.py:139(__init__)
    49344    0.057    0.000    0.079    0.000 TTransport.py:145(isOpen)
        1    0.000    0.000    0.000    0.000 TTransport.py:148(open)
   197376    0.473    0.000  145.718    0.001 TTransport.py:154(read)
   246720    0.177    0.000    0.376    0.000 TTransport.py:162(write)
    49344    0.177    0.000    1.156    0.000 TTransport.py:165(flush)
    49344    0.015    0.000    0.015    0.000 TTransport.py:173(cstringio_buf)
  2196034    8.376    0.000   57.806    0.000 TTransport.py:177(cstringio_refill)
   197376    0.389    0.000  146.149    0.001 TTransport.py:54(readAll)
       46    0.000    0.000    0.000    0.000 abc.py:89(<genexpr>)
       34    0.000    0.000    0.000    0.000 collections.py:329(<genexpr>)
        4    0.000    0.000    0.000    0.000 collections.py:353(<genexpr>)
        4    0.000    0.000    0.000    0.000 collections.py:355(<genexpr>)
        4    0.000    0.000    0.000    0.000 collections.py:38(__init__)
        1    0.000    0.000    0.000    0.000 collections.py:387(Counter)
       44    0.000    0.000    0.000    0.000 collections.py:54(__setitem__)
       28    0.000    0.000    0.000    0.000 collections.py:73(__iter__)
        3    0.000    0.000    0.000    0.000 decimal.py:3782(__init__)
       27    0.000    0.000    0.000    0.000 decimal.py:3809(<genexpr>)
       27    0.000    0.000    0.000    0.000 decimal.py:3816(<genexpr>)
    49186    0.018    0.000    0.018    0.000 hiveserver2.py:113(buffersize)
    49344    3.269    0.000 1590.514    0.032 hiveserver2.py:116(wrapper)
100680255   29.571    0.000   29.571    0.000 hiveserver2.py:122(has_result_set)
        1    0.000    0.000   39.283   39.283 hiveserver2.py:140(execute)
        1    0.000    0.000    2.112    2.112 hiveserver2.py:142(op)
        1    0.000    0.000    0.000    0.000 hiveserver2.py:146(_get_socket)
        2    0.004    0.002    0.029    0.014 hiveserver2.py:15(<module>)
        1    0.000    0.000   39.283   39.283 hiveserver2.py:154(_execute_sync)
        1    0.000    0.000    0.000    0.000 hiveserver2.py:158(_get_transport)
        1    0.000    0.000    0.000    0.000 hiveserver2.py:166(_reset_state)
        1    0.002    0.002   37.169   37.169 hiveserver2.py:175(_wait_to_finish)
        1    0.000    0.000    0.000    0.000 hiveserver2.py:187(connect_to_impala)
      154    0.000    0.000    0.000    0.000 hiveserver2.py:189(_get_sleep_interval)
 50340127   50.937    0.000 1760.004    0.000 hiveserver2.py:207(fetchone)
        1    0.000    0.000    0.001    0.001 hiveserver2.py:209(open_session)
        1    0.000    0.000    2.112    2.112 hiveserver2.py:226(execute_statement)
        1    0.000    0.000    0.002    0.002 hiveserver2.py:237(get_result_schema)
        1    0.000    0.000    0.000    0.000 hiveserver2.py:24(HiveServer2Connection)
 50340127   52.496    0.000 1693.039    0.000 hiveserver2.py:250(next)
    49186 1153.582    0.023 1584.908    0.032 hiveserver2.py:263(fetch_results)
        1    0.000    0.000    0.000    0.000 hiveserver2.py:30(__init__)
      155    0.002    0.000    0.115    0.001 hiveserver2.py:406(get_operation_status)
        1    0.000    0.000    0.001    0.001 hiveserver2.py:49(cursor)
        1    0.000    0.000    0.000    0.000 hiveserver2.py:66(HiveServer2Cursor)
        1    0.000    0.000    0.000    0.000 hiveserver2.py:71(__init__)
    49344    0.039    0.000    0.039    0.000 hiveserver2.py:76(err_if_rpc_not_ok)
    49186    0.017    0.000    0.017    0.000 hiveserver2.py:89(description)
      391    0.000    0.000    0.000    0.000 sre_parse.py:130(__getitem__)
       42    0.000    0.000    0.000    0.000 sre_parse.py:134(__setitem__)
      145    0.000    0.000    0.000    0.000 sre_parse.py:138(append)
   135/54    0.000    0.000    0.000    0.000 sre_parse.py:140(getwidth)
       12    0.000    0.000    0.000    0.000 sre_parse.py:178(__init__)
     1116    0.001    0.000    0.001    0.000 sre_parse.py:182(__next)
      308    0.000    0.000    0.000    0.000 sre_parse.py:195(match)
      983    0.000    0.000    0.002    0.000 sre_parse.py:201(get)
        1    0.000    0.000    0.000    0.000 sre_parse.py:205(tell)
      114    0.000    0.000    0.000    0.000 sre_parse.py:210(isident)
       21    0.000    0.000    0.000    0.000 sre_parse.py:216(isname)
        2    0.000    0.000    0.000    0.000 sre_parse.py:225(_class_escape)
       32    0.000    0.000    0.000    0.000 sre_parse.py:257(_escape)
    47/12    0.000    0.000    0.004    0.000 sre_parse.py:301(_parse_sub)
    52/13    0.001    0.000    0.004    0.000 sre_parse.py:379(_parse)
       12    0.000    0.000    0.000    0.000 sre_parse.py:67(__init__)
       12    0.000    0.000    0.004    0.000 sre_parse.py:675(parse)
       29    0.000    0.000    0.000    0.000 sre_parse.py:72(opengroup)
       29    0.000    0.000    0.000    0.000 sre_parse.py:83(closegroup)
       98    0.000    0.000    0.000    0.000 sre_parse.py:90(__init__)
   442674    0.315    0.000    0.315    0.000 ttypes.py:2296(__init__)
   590232    0.262    0.000    0.262    0.000 ttypes.py:2380(__init__)
  1032906    2.061    0.000    2.061    0.000 ttypes.py:2644(__init__)
    49186    0.033    0.000    0.033    0.000 ttypes.py:2786(__init__)
    49344    0.050    0.000    0.050    0.000 ttypes.py:2896(__init__)
      155    0.000    0.000    0.000    0.000 ttypes.py:5195(__init__)
      155    0.000    0.000    0.000    0.000 ttypes.py:5266(__init__)
       21    0.000    0.000    0.000    0.000 ttypes.py:540(__init__)
    49186    0.055    0.000    0.055    0.000 ttypes.py:5764(__init__)
        1    0.000    0.000    0.000    0.000 ttypes.py:5840(TFetchResultsResp)
    49186    0.044    0.000    0.044    0.000 ttypes.py:5855(__init__)
        1  286.275  286.275 2121.074 2121.074 test.py:24(func1)
        1    0.001    0.001 2121.112 2121.112 test.py:3(<module>)
       12    0.000    0.000    0.000    0.000 {_sre.compile}
       35    0.000    0.000    0.000    0.000 {_sre.getlower}
   148032    0.134    0.000    0.134    0.000 {_struct.pack}
   148032    0.107    0.000    0.107    0.000 {_struct.unpack}
  2294724    4.074    0.000    4.074    0.000 {cStringIO.StringIO}
      251    0.000    0.000    0.000    0.000 {getattr}
    99230    0.061    0.000    0.061    0.000 {isinstance}
    24/13    0.000    0.000    0.000    0.000 {issubclass}
55672069/55672017    7.519    0.000    7.519    0.000 {len}
    49351    0.061    0.000    0.061    0.000 {max}
        4    0.000    0.000    0.000    0.000 {method '__contains__' of 'frozenset' objects}
       11    0.000    0.000    0.000    0.000 {method '__subclasses__' of 'type' objects}
1107483909  105.411    0.000  105.411    0.000 {method 'append' of 'list' objects}
        4    0.220    0.055    0.220    0.055 {method 'close' of 'file' objects}
        1    0.000    0.000    0.000    0.000 {method 'connect' of '_socket.socket' objects}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
        1    0.000    0.000    0.000    0.000 {method 'endswith' of 'str' objects}
    49198    0.249    0.000    0.249    0.000 {method 'extend' of 'list' objects}
    49344    0.021    0.000    0.021    0.000 {method 'getvalue' of 'cStringIO.StringO' objects}
 50340126   32.139    0.000   32.139    0.000 {method 'pop' of 'list' objects}
   246720    0.140    0.000    0.140    0.000 {method 'read' of 'cStringIO.StringI' objects}
       23    0.000    0.000    0.000    0.000 {method 'readline' of 'file' objects}
  2245378  185.259    0.000  185.259    0.000 {method 'recv' of '_socket.socket' objects}
    49344    0.764    0.000    0.764    0.000 {method 'send' of '_socket.socket' objects}
        2    0.000    0.000    0.000    0.000 {method 'setter' of 'property' objects}
        1    0.000    0.000    0.000    0.000 {method 'settimeout' of '_socket.socket' objects}
       45    0.000    0.000    0.000    0.000 {method 'split' of 'str' objects}
       11    0.000    0.000    0.000    0.000 {method 'startswith' of 'str' objects}
       45    0.000    0.000    0.000    0.000 {method 'strip' of 'str' objects}
   246720    0.198    0.000    0.198    0.000 {method 'write' of 'cStringIO.StringO' objects}
 50340130   34.491    0.000   34.491    0.000 {method 'write' of 'file' objects}
      193    0.000    0.000    0.000    0.000 {min}
        4    0.800    0.200    0.800    0.200 {open}
1057142741   84.179    0.000   84.179    0.000 {ord}
        1    0.000    0.000    0.000    0.000 {posix.getcwd}
        1    0.000    0.000    0.000    0.000 {posix.lstat}
        1    0.000    0.000    0.000    0.000 {posix.urandom}
        1    0.000    0.000    0.000    0.000 {repr}
      146    0.000    0.000    0.000    0.000 {setattr}
        2    0.000    0.000    0.000    0.000 {sys._getframe}
        3    0.000    0.000    0.000    0.000 {thread.allocate_lock}
        2    0.000    0.000    0.000    0.000 {thread.get_ident}
    49344   32.932    0.001   93.520    0.002 {thrift.protocol.fastbinary.decode_binary}
    49344    0.184    0.000    0.184    0.000 {thrift.protocol.fastbinary.encode_binary}
      154   37.048    0.241   37.048    0.241 {time.sleep}
        1    0.000    0.000    0.000    0.000 {time.strftime}
      159    0.000    0.000    0.000    0.000 {time.time}
bqf10yzr

bqf10yzr1#

如果您将探查器的输出保存到一个文件中(我将其保存到文件中) a )按第二列排序 tottime (在函数本身花费的时间,不包括被调用的函数)并取前10名(不确定是否给它一个 head 合适):

[max@supernova:~/tmp] $ sort --key=2nr a | head 
    49186 1153.582    0.023 1584.908    0.032 hiveserver2.py:263(fetch_results)
        1  286.275  286.275 2121.074 2121.074 test.py:24(func1)
  2245378  185.259    0.000  185.259    0.000 {method 'recv' of '_socket.socket' objects}
1107483909  105.411    0.000  105.411    0.000 {method 'append' of 'list' objects}
1057142741   84.179    0.000   84.179    0.000 {ord}
 50340127   52.496    0.000 1693.039    0.000 hiveserver2.py:250(next)
 50340127   50.937    0.000 1760.004    0.000 hiveserver2.py:207(fetchone)
      154   37.048    0.241   37.048    0.241 {time.sleep}
 50340130   34.491    0.000   34.491    0.000 {method 'write' of 'file' objects}
    49344   32.932    0.001   93.520    0.002 {thrift.protocol.fastbinary.decode_binary}

解释:
大部分时间花在49186个电话上 hiveserver2.py:263(fetch_results) . 我对那个图书馆不太熟悉,但我想看看是否可以减少通话次数。通常,函数的开头和结尾都会有开销,请求/响应开销等等,因此通过减少调用的数量,可以减少一些开销。可以进行批量结果行获取(即一次获取多行)。
不确定什么 test.py:24(func1) 但它是探查器输出中的第二个热点,尽管只调用了一次。它能做什么循环吗?
下一个是 185.259 时间单位,他们花在2245378个电话 {method 'recv' of '_socket.socket' objects} . 这个数字 recv 电话似乎过多,可能是因为使用 fetchone ,因为它看起来像是通过套接字为每一行发送一个新请求。
下一个 {method 'append' of 'list' objects} ,没有足够的信息从何处调用。
同样适用于 {ord} .
接下来,两个条目 hiveserver2.py:250(next) 以及 hiveserver2.py:207(fetchone) 占总时间的大部分(第4栏)。
考虑到上述解释,这看起来像是 fetchone 是次优的。
而不是打电话 fetchone 我会打电话给你的 fetchall 如果结果集保证适合可用内存,则为 fetchmany ,以便使用尽可能少的请求/批从数据库加载记录集。

相关问题