SQLAlchemy ON DUPLICATE KEY UPDATE
Solution 1:
ON DUPLICATE KEY UPDATE
post version-1.2 for MySQL
This functionality is now built into SQLAlchemy for MySQL only. somada141's answer below has the best solution: https://stackoverflow.com/a/48373874/319066
ON DUPLICATE KEY UPDATE
in the SQL statement
If you want the generated SQL to actually include ON DUPLICATE KEY UPDATE
, the simplest way involves using a @compiles
decorator.
The code (linked from a good thread on the subject on reddit) for an example can be found on github:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def append_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
if 'append_string' in insert.kwargs:
return s + " " + insert.kwargs['append_string']
return s
my_connection.execute(my_table.insert(append_string = 'ON DUPLICATE KEY UPDATE foo=foo'), my_values)
But note that in this approach, you have to manually create the append_string. You could probably change the append_string function so that it automatically changes the insert string into an insert with 'ON DUPLICATE KEY UPDATE' string, but I'm not going to do that here due to laziness.
ON DUPLICATE KEY UPDATE
functionality within the ORM
SQLAlchemy does not provide an interface to ON DUPLICATE KEY UPDATE
or MERGE
or any other similar functionality in its ORM layer. Nevertheless, it has the session.merge()
function that can replicate the functionality only if the key in question is a primary key.
session.merge(ModelObject)
first checks if a row with the same primary key value exists by sending a SELECT
query (or by looking it up locally). If it does, it sets a flag somewhere indicating that ModelObject is in the database already, and that SQLAlchemy should use an UPDATE
query. Note that merge is quite a bit more complicated than this, but it replicates the functionality well with primary keys.
But what if you want ON DUPLICATE KEY UPDATE
functionality with a non-primary key (for example, another unique key)? Unfortunately, SQLAlchemy doesn't have any such function. Instead, you have to create something that resembles Django's get_or_create()
. Another StackOverflow answer covers it, and I'll just paste a modified, working version of it here for convenience.
def get_or_create(session, model, defaults=None, **kwargs):
instance = session.query(model).filter_by(**kwargs).first()
if instance:
return instance
else:
params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement))
if defaults:
params.update(defaults)
instance = model(**params)
return instance
Solution 2:
I should mention that ever since the v1.2 release, the SQLAlchemy 'core' has a solution to the above with that's built in and can be seen under here (copied snippet below):
from sqlalchemy.dialects.mysql import insert
insert_stmt = insert(my_table).values(
id='some_existing_id',
data='inserted value')
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
data=insert_stmt.inserted.data,
status='U'
)
conn.execute(on_duplicate_key_stmt)
Solution 3:
Based on phsource's answer, and for the specific use-case of using MySQL and completely overriding the data for the same key without performing a DELETE
statement, one can use the following @compiles
decorated insert expression:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def append_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
if insert.kwargs.get('on_duplicate_key_update'):
fields = s[s.find("(") + 1:s.find(")")].replace(" ", "").split(",")
generated_directive = ["{0}=VALUES({0})".format(field) for field in fields]
return s + " ON DUPLICATE KEY UPDATE " + ",".join(generated_directive)
return s
Solution 4:
It's depends upon you. If you want to replace then pass OR REPLACE
in prefixes
def bulk_insert(self,objects,table):
#table: Your table class and objects are list of dictionary [{col1:val1, col2:vale}]
for counter,row in enumerate(objects):
inserter = table.__table__.insert(prefixes=['OR IGNORE'], values=row)
try:
self.db.execute(inserter)
except Exception as E:
print E
if counter % 100 == 0:
self.db.commit()
self.db.commit()
Here commit interval can be changed to speed up or speed down