SQLAlchemy ON DUPLICATE KEY UPDATE

Solution 1:

ON DUPLICATE KEY UPDATE post version-1.2 for MySQL

This functionality is now built into SQLAlchemy for MySQL only. somada141's answer below has the best solution: https://stackoverflow.com/a/48373874/319066

ON DUPLICATE KEY UPDATE in the SQL statement

If you want the generated SQL to actually include ON DUPLICATE KEY UPDATE, the simplest way involves using a @compiles decorator.

The code (linked from a good thread on the subject on reddit) for an example can be found on github:

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def append_string(insert, compiler, **kw):
    s = compiler.visit_insert(insert, **kw)
    if 'append_string' in insert.kwargs:
        return s + " " + insert.kwargs['append_string']
    return s


my_connection.execute(my_table.insert(append_string = 'ON DUPLICATE KEY UPDATE foo=foo'), my_values)

But note that in this approach, you have to manually create the append_string. You could probably change the append_string function so that it automatically changes the insert string into an insert with 'ON DUPLICATE KEY UPDATE' string, but I'm not going to do that here due to laziness.

ON DUPLICATE KEY UPDATE functionality within the ORM

SQLAlchemy does not provide an interface to ON DUPLICATE KEY UPDATE or MERGE or any other similar functionality in its ORM layer. Nevertheless, it has the session.merge() function that can replicate the functionality only if the key in question is a primary key.

session.merge(ModelObject) first checks if a row with the same primary key value exists by sending a SELECT query (or by looking it up locally). If it does, it sets a flag somewhere indicating that ModelObject is in the database already, and that SQLAlchemy should use an UPDATE query. Note that merge is quite a bit more complicated than this, but it replicates the functionality well with primary keys.

But what if you want ON DUPLICATE KEY UPDATE functionality with a non-primary key (for example, another unique key)? Unfortunately, SQLAlchemy doesn't have any such function. Instead, you have to create something that resembles Django's get_or_create(). Another StackOverflow answer covers it, and I'll just paste a modified, working version of it here for convenience.

def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance
    else:
        params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement))
        if defaults:
            params.update(defaults)
        instance = model(**params)
        return instance

Solution 2:

I should mention that ever since the v1.2 release, the SQLAlchemy 'core' has a solution to the above with that's built in and can be seen under here (copied snippet below):

from sqlalchemy.dialects.mysql import insert

insert_stmt = insert(my_table).values(
    id='some_existing_id',
    data='inserted value')

on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
    data=insert_stmt.inserted.data,
    status='U'
)

conn.execute(on_duplicate_key_stmt)

Solution 3:

Based on phsource's answer, and for the specific use-case of using MySQL and completely overriding the data for the same key without performing a DELETE statement, one can use the following @compiles decorated insert expression:

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def append_string(insert, compiler, **kw):
    s = compiler.visit_insert(insert, **kw)
    if insert.kwargs.get('on_duplicate_key_update'):
        fields = s[s.find("(") + 1:s.find(")")].replace(" ", "").split(",")
        generated_directive = ["{0}=VALUES({0})".format(field) for field in fields]
        return s + " ON DUPLICATE KEY UPDATE " + ",".join(generated_directive)
    return s

Solution 4:

It's depends upon you. If you want to replace then pass OR REPLACE in prefixes

  def bulk_insert(self,objects,table):
    #table: Your table class and objects are list of dictionary [{col1:val1, col2:vale}] 
    for counter,row in enumerate(objects):
        inserter = table.__table__.insert(prefixes=['OR IGNORE'], values=row)
        try:
            self.db.execute(inserter)
        except Exception as E:
            print E
        if counter % 100 == 0:
            self.db.commit()                    
    self.db.commit()

Here commit interval can be changed to speed up or speed down