python pandas to_sql with sqlalchemy : how to speed up exporting to MS SQL?

Solution 1:

I recently had the same problem and feel like to add an answer to this for others. to_sql seems to send an INSERT query for every row which makes it really slow. But since 0.24.0 there is a method parameter in pandas.to_sql() where you can define your own insertion function or just use method='multi' to tell pandas to pass multiple rows in a single INSERT query, which makes it a lot faster.

Note that your Database may has a parameter limit. In that case you also have to define a chunksize.

So the solution should simply look like to this:

my_data_frame.to_sql(TableName, engine, chunksize=<yourParameterLimit>, method='multi')

If you do not know your database parameter limit, just try it without the chunksize parameter. It will run or give you an error telling you your limit.

Solution 2:

The DataFrame.to_sql method generates insert statements to your ODBC connector which then is treated by the ODBC connector as regular inserts.

When this is slow, it is not the fault of pandas.

Saving the output of the DataFrame.to_sql method to a file, then replaying that file over an ODBC connector will take the same amount of time.

The proper way of bulk importing data into a database is to generate a csv file and then use a load command, which in the MS flavour of SQL databases is called BULK INSERT

For example:

BULK INSERT mydatabase.myschema.mytable
FROM 'mydatadump.csv';

The syntax reference is as follows:

BULK INSERT 
   [ database_name . [ schema_name ] . | schema_name . ] [ table_name | view_name ] 
      FROM 'data_file' 
     [ WITH 
    ( 
   [ [ , ] BATCHSIZE = batch_size ] 
   [ [ , ] CHECK_CONSTRAINTS ] 
   [ [ , ] CODEPAGE = { 'ACP' | 'OEM' | 'RAW' | 'code_page' } ] 
   [ [ , ] DATAFILETYPE = 
      { 'char' | 'native'| 'widechar' | 'widenative' } ] 
   [ [ , ] FIELDTERMINATOR = 'field_terminator' ] 
   [ [ , ] FIRSTROW = first_row ] 
   [ [ , ] FIRE_TRIGGERS ] 
   [ [ , ] FORMATFILE = 'format_file_path' ] 
   [ [ , ] KEEPIDENTITY ] 
   [ [ , ] KEEPNULLS ] 
   [ [ , ] KILOBYTES_PER_BATCH = kilobytes_per_batch ] 
   [ [ , ] LASTROW = last_row ] 
   [ [ , ] MAXERRORS = max_errors ] 
   [ [ , ] ORDER ( { column [ ASC | DESC ] } [ ,...n ] ) ] 
   [ [ , ] ROWS_PER_BATCH = rows_per_batch ] 
   [ [ , ] ROWTERMINATOR = 'row_terminator' ] 
   [ [ , ] TABLOCK ] 
   [ [ , ] ERRORFILE = 'file_name' ] 
    )] 

Solution 3:

You can use this: what makes it faster is the method parameter of pandas to_sql. I hope this help helps.

The result of this on my experience was from infinite time to 8 secs.


df = pd.read_csv('test.csv')

conn = create_engine(<connection_string>)

start_time = time.time()
df.to_sql('table_name', conn, method='multi',index=False, if_exists='replace')
print("--- %s seconds ---" % (time.time() - start_time))

Solution 4:

You can use d6tstack which has fast pandas to SQL functionality because it uses native DB import commands. It supports MS SQL, Postgres and MYSQL

uri_psql = 'postgresql+psycopg2://usr:pwd@localhost/db'
d6tstack.utils.pd_to_psql(df, uri_psql, 'table')
uri_mssql = 'mssql+pymssql://usr:pwd@localhost/db'
d6tstack.utils.pd_to_mssql(df, uri_mssql, 'table', 'schema') # experimental

Also useful for importing multiple CSV with data schema changes and/or preprocess with pandas before writing to db, see further down in examples notebook

d6tstack.combine_csv.CombinerCSV(glob.glob('*.csv'), 
    apply_after_read=apply_fun).to_psql_combine(uri_psql, 'table')

Solution 5:

I was running out of time and memory (more than 18GB allocated for a DataFrame loaded from 120MB CSV) with this line:

df.to_sql('my_table', engine, if_exists='replace', method='multi', dtype={"text_field": db.String(64), "text_field2": db.String(128), "intfield1": db.Integer(), "intfield2": db.Integer(), "floatfield": db.Float()})

Here is the code that helped me to import and track progress of insertions at the same time:

import sqlalchemy as db
engine = db.create_engine('mysql://user:password@localhost:3306/database_name', echo=False)
connection = engine.connect()
metadata = db.MetaData()

my_table = db.Table('my_table', metadata,
              db.Column('text_field', db.String(64), index=True),
              db.Column('text_field2', db.String(128), index=True),
              db.Column('intfield1', db.Integer()),
              db.Column('intfield2', db.Integer()),
              db.Column('floatfield', db.Float())
             )
metadata.create_all(engine)
kw_dict = df.reset_index().sort_values(by="intfield2", ascending=False).to_dict(orient="records")

batch_size=10000
for batch_start in range(0, len(kw_dict), batch_size):
    print("Inserting {}-{}".format(batch_start, batch_start + batch_size))
    connection.execute(my_table.insert(), kw_dict[batch_start:batch_start + batch_size])