Memory errors batch inserting csv data into SQL Server using SQLAlchemy, pyodbc and MSSQL

jv4diomz  于 5个月前  发布在  SQL Server
关注(0)|答案(1)|浏览(59)

As described in the title I am using SQLAlchemy and PYODBC in a python script to insert large csv files (up to 14GB) into a locally hosted SQL Server database.

I know that I cannot host a 14gb dataframe in memory so I am using the chunk feature in pandas to run batch inserts and have experimented with batch sizes as small as 100 rows which easily fits into memory.

I have a feeling the memory error is related to the SQL side of things. To minimize load processing I do not have any indexes on the tables I am inserting into (hash tables). No matter the batch size I am running out of memory at the same point in the loading process.

What am I overlooking? Should I be flushing the memory somehow? Or is SQL Server waiting to commit the transaction until the connection closes?

Here is my current code:

`import os
import glob
import traceback

import pandas as pd
import pyodbc
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from tqdm import tqdm

# Replace the following variables with your database connection details
DB_USERNAME = '-----'
DB_PASSWORD = '-----'
DB_HOST = '------'
DB_NAME = '------'

DATA_DIRECTORY = "---------"
ERRORS_DIRECTORY = os.path.join(DATA_DIRECTORY, "errors")

def create_database_engine():
    engine = create_engine(f"mssql+pyodbc://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}?driver=ODBC+Driver+17+for+SQL+Server", fast_executemany=True )
    return engine

def batch_insert_to_database(engine, data, table, error_log_file):
    # Insert the data into the database in batches
    # The first batch will DROP ANY EXISTING TABLE IN THE DATABASE WITH THE SAME NAME
    try:
        data.to_sql(table, con=engine, index=False, if_exists='append')
    except SQLAlchemyError as e:
        error_message = f"Error during batch insertion: {e}"
        print("error encountered")
        
        with open(error_log_file, 'a') as error_log:
            error_log.write(error_message + '\n')
            traceback.print_exc(file=error_log)

        return False
    return True

def load_table_data(csv, table, errors_directory):
    
    print(f"Beginning load for {table}")
    
    # Create a database engine
    engine = create_database_engine()

    # Create an empty errors DataFrame to store failed batches
    errors_df = pd.DataFrame()

    # Batch size for insertion
    batch_size = 100

    # Initialize tqdm for progress tracking
    progress_bar = tqdm(total=0, desc="Processing")
    
    with engine.connect() as connection:
        truncate_command = text(f"TRUNCATE TABLE [dbo].[{table}]")
        connection.execute(truncate_command)

    error_report_file = os.path.join(errors_directory, f"errors_{table}.txt")
    # Read data from the CSV file in batches
    for batch_data in pd.read_csv(csv, chunksize=batch_size):
        # Try to insert the batch into the database
        success = batch_insert_to_database(engine, batch_data, table, error_report_file)

        # If the batch insertion fails, add the batch to the errors DataFrame
        if not success:
            errors_df = pd.concat([errors_df, batch_data])

        # Update the progress bar
        progress_bar.update(len(batch_data))

    # Close the progress bar
    progress_bar.close()

    error_data_file = os.path.join(errors_directory, f"errors_{table}.csv")
    # Save the errors DataFrame to a CSV file
    if not errors_df.empty:
        errors_df.to_csv(error_data_file, index=False)
        print(f"Errors saved to {error_data_file}")

def main():
    pattern = os.path.join(DATA_DIRECTORY, '**', '*.gz')
    gz_files = glob.glob(pattern, recursive=True)
    tables = [[file, file.split(os.path.sep)[-1].split(".")[0]] for file in gz_files]
    
    for table_data in tables:
        print(table_data)
        load_table_data(table_data[0], table_data[1], ERRORS_DIRECTORY)

if __name__ == "__main__":
    main()`

Here is the stack trace:

Traceback (most recent call last):
  File "C:\...\main.py", line 97, in <module>
    main()
  File "C:\...\main.py", line 94, in main
    load_table_data(table_data[0], table_data[1], ERRORS_DIRECTORY)
  File "C:\...\main.py", line 66, in load_table_data
    success = batch_insert_to_database(engine, batch_data, table, error_report_file)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\main.py", line 29, in batch_insert_to_database
    data.to_sql(table, con=engine, index=False, if_exists='append')
  File "C:\...\pandas\core\generic.py", line 3008, in to_sql
    return sql.to_sql(
           ^^^^^^^^^^^
  File "C:\...\pandas\io\sql.py", line 788, in to_sql
    return pandas_sql.to_sql(
           ^^^^^^^^^^^^^^^^^^
  File "C:\...\pandas\io\sql.py", line 1958, in to_sql
    total_inserted = sql_engine.insert_records(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\pandas\io\sql.py", line 1498, in insert_records
    return table.insert(chunksize=chunksize, method=method)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\pandas\io\sql.py", line 1059, in insert
    num_inserted = exec_insert(conn, keys, chunk_iter)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\pandas\io\sql.py", line 951, in _execute_insert
    result = conn.execute(self.table.insert(), data)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\sqlalchemy\engine\base.py", line 1412, in execute
    return meth(
           ^^^^^
  File "C:\...\sqlalchemy\sql\elements.py", line 516, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\sqlalchemy\engine\base.py", line 1635, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\sqlalchemy\engine\base.py", line 1844, in _execute_context
    return self._exec_single_context(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\sqlalchemy\engine\base.py", line 1984, in _exec_single_context
    self._handle_dbapi_exception(
  File "C:\...\sqlalchemy\engine\base.py", line 2342, in _handle_dbapi_exception
    raise exc_info[1].with_traceback(exc_info[2])
  File "C:\...\sqlalchemy\engine\base.py", line 1934, in _exec_single_context
    self.dialect.do_executemany(
  File "C:\...\sqlalchemy\dialects\mssql\pyodbc.py", line 716, in do_executemany
    super().do_executemany(cursor, statement, parameters, context=context)
  File "C:\...\sqlalchemy\engine\default.py", line 918, in do_executemany
    cursor.executemany(statement, parameters)

And the error message is simply "MemoryError".

Further investigation revealed it is a specific file that is causing the error even if I don't load prior ones. I am starting to think it might be a bad error message. Thank you for all the suggestions. I don't see how there could be a memory error retrieving 100 rows with 6 columns from a csv. I will post if/when I find it.

dwbf0jvd

dwbf0jvd1#

I don't know a lot of sqlserver but maybe try this to see if the entire import is trying to pile up in a single transaction, there might be a better way but this would be informative, (this would commit every batch):

def batch_insert_to_database(engine, data, table, error_log_file):
    with engine.begin() as conn:
        data.to_sql(table, con=conn, index=False, if_exists='append')
    return True

相关问题