python pandas to_sql with sqlalchemy : how to speed up exporting to MS SQL?
Solution 1:
I recently had the same problem and feel like to add an answer to this for others.
to_sql
seems to send an INSERT
query for every row which makes it really slow. But since 0.24.0
there is a method
parameter in pandas.to_sql()
where you can define your own insertion function or just use method='multi'
to tell pandas to pass multiple rows in a single INSERT query, which makes it a lot faster.
Note that your Database may has a parameter limit. In that case you also have to define a chunksize.
So the solution should simply look like to this:
my_data_frame.to_sql(TableName, engine, chunksize=<yourParameterLimit>, method='multi')
If you do not know your database parameter limit, just try it without the chunksize parameter. It will run or give you an error telling you your limit.
Solution 2:
The DataFrame.to_sql
method generates insert statements to your ODBC connector which then is treated by the ODBC connector as regular inserts.
When this is slow, it is not the fault of pandas.
Saving the output of the DataFrame.to_sql
method to a file, then replaying that file over an ODBC connector will take the same amount of time.
The proper way of bulk importing data into a database is to generate a csv file and then use a load command, which in the MS flavour of SQL databases is called BULK INSERT
For example:
BULK INSERT mydatabase.myschema.mytable
FROM 'mydatadump.csv';
The syntax reference is as follows:
BULK INSERT
[ database_name . [ schema_name ] . | schema_name . ] [ table_name | view_name ]
FROM 'data_file'
[ WITH
(
[ [ , ] BATCHSIZE = batch_size ]
[ [ , ] CHECK_CONSTRAINTS ]
[ [ , ] CODEPAGE = { 'ACP' | 'OEM' | 'RAW' | 'code_page' } ]
[ [ , ] DATAFILETYPE =
{ 'char' | 'native'| 'widechar' | 'widenative' } ]
[ [ , ] FIELDTERMINATOR = 'field_terminator' ]
[ [ , ] FIRSTROW = first_row ]
[ [ , ] FIRE_TRIGGERS ]
[ [ , ] FORMATFILE = 'format_file_path' ]
[ [ , ] KEEPIDENTITY ]
[ [ , ] KEEPNULLS ]
[ [ , ] KILOBYTES_PER_BATCH = kilobytes_per_batch ]
[ [ , ] LASTROW = last_row ]
[ [ , ] MAXERRORS = max_errors ]
[ [ , ] ORDER ( { column [ ASC | DESC ] } [ ,...n ] ) ]
[ [ , ] ROWS_PER_BATCH = rows_per_batch ]
[ [ , ] ROWTERMINATOR = 'row_terminator' ]
[ [ , ] TABLOCK ]
[ [ , ] ERRORFILE = 'file_name' ]
)]
Solution 3:
You can use this: what makes it faster is the method
parameter of pandas to_sql
. I hope this help helps.
The result of this on my experience was from infinite time to 8 secs.
df = pd.read_csv('test.csv')
conn = create_engine(<connection_string>)
start_time = time.time()
df.to_sql('table_name', conn, method='multi',index=False, if_exists='replace')
print("--- %s seconds ---" % (time.time() - start_time))
Solution 4:
You can use d6tstack which has fast pandas to SQL functionality because it uses native DB import commands. It supports MS SQL, Postgres and MYSQL
uri_psql = 'postgresql+psycopg2://usr:pwd@localhost/db'
d6tstack.utils.pd_to_psql(df, uri_psql, 'table')
uri_mssql = 'mssql+pymssql://usr:pwd@localhost/db'
d6tstack.utils.pd_to_mssql(df, uri_mssql, 'table', 'schema') # experimental
Also useful for importing multiple CSV with data schema changes and/or preprocess with pandas before writing to db, see further down in examples notebook
d6tstack.combine_csv.CombinerCSV(glob.glob('*.csv'),
apply_after_read=apply_fun).to_psql_combine(uri_psql, 'table')
Solution 5:
I was running out of time and memory (more than 18GB allocated for a DataFrame loaded from 120MB CSV) with this line:
df.to_sql('my_table', engine, if_exists='replace', method='multi', dtype={"text_field": db.String(64), "text_field2": db.String(128), "intfield1": db.Integer(), "intfield2": db.Integer(), "floatfield": db.Float()})
Here is the code that helped me to import and track progress of insertions at the same time:
import sqlalchemy as db
engine = db.create_engine('mysql://user:password@localhost:3306/database_name', echo=False)
connection = engine.connect()
metadata = db.MetaData()
my_table = db.Table('my_table', metadata,
db.Column('text_field', db.String(64), index=True),
db.Column('text_field2', db.String(128), index=True),
db.Column('intfield1', db.Integer()),
db.Column('intfield2', db.Integer()),
db.Column('floatfield', db.Float())
)
metadata.create_all(engine)
kw_dict = df.reset_index().sort_values(by="intfield2", ascending=False).to_dict(orient="records")
batch_size=10000
for batch_start in range(0, len(kw_dict), batch_size):
print("Inserting {}-{}".format(batch_start, batch_start + batch_size))
connection.execute(my_table.insert(), kw_dict[batch_start:batch_start + batch_size])